-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17259: Support to override serverProperties and restart cluster in ClusterTestExtensions #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
… in ClusterTestExtensions Signed-off-by: PoAn Yang <[email protected]>
Hi @FrankYang0529 , Please consider a case below.
Maybe cluster will get hanging, thanks. |
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
1 similar comment
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
@FrankYang0529 could you please fix the conflicts |
WalkthroughThe changes introduce new restart and shutdown capabilities to the Kafka cluster test infrastructure. The Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Test Method
participant ClusterInstance
participant RaftClusterInstance
participant KafkaClusterTestKit
Test->>ClusterInstance: restart(Map overrides)
ClusterInstance->>RaftClusterInstance: restart(Map overrides)
RaftClusterInstance->>KafkaClusterTestKit: restart(Map overrides)
KafkaClusterTestKit->>KafkaClusterTestKit: shutdown()
KafkaClusterTestKit->>KafkaClusterTestKit: Rebuild servers with overrides
KafkaClusterTestKit->>KafkaClusterTestKit: startup()
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
This pull request introduces a new restart
method to the KafkaClusterTestKit
class, enabling the overriding of server properties and restarting of the cluster within test extensions. This enhancement aims to simplify test cases that require updating broker configurations and restarting the cluster, avoiding redundant reimplementation of the same logic across multiple tests. The changes involve adding a restart
method to the ClusterInstance
interface and implementing it in RaftClusterInvocationContext
and KafkaClusterTestKit
. Additionally, the pull request includes new test cases to verify the functionality of the restart
method with and without overridden configurations.
Highlights
- New
restart
method: Arestart
method is added to theClusterInstance
interface and implemented inRaftClusterInvocationContext
andKafkaClusterTestKit
to allow restarting the cluster with overridden configurations. - Configuration Overrides: The
restart
method inKafkaClusterTestKit
accepts a map of per-server configuration overrides, allowing specific configurations to be applied to individual brokers and controllers during restart. - Listener Management: The
restart
method captures the listeners before shutdown and reassigns them after restart to ensure proper reconnection. - Test Cases: New test cases are added to
ClusterTestExtensionsTest
to verify therestart
method's behavior with and without configuration overrides.
Changelog
Click here to see the changelog
- test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
- Added
shutdown
method to properly shutdown the cluster and capture listeners. - Added
restart
method to allow restarting the cluster with overridden configurations. - Modified the class to store listeners per node id.
- Added logic to apply configuration overrides to brokers and controllers during restart.
- Added logic to reassign listeners to brokers and controllers during restart.
- Added
- test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
- Added a
restart
method with no arguments that calls therestart
method with an empty map. - Added a
restart
method that accepts a map of per-server configuration overrides.
- Added a
- test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
- Implemented the
restart
method to delegate to theclusterTestKit
.
- Implemented the
- test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
- Added a test case
testRestartWithOverriddenConfig
to verify therestart
method with configuration overrides. - Added a test case
testRestartWithoutOverriddenConfig
to verify therestart
method without configuration overrides.
- Added a test case
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
A cluster restarts with grace,
Configs changed, finding its place.
Brokers awaken,
No longer forsaken,
A new test run starts the race.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces a new restart
method to the KafkaClusterTestKit
and ClusterInstance
interfaces, allowing for overriding server properties and restarting the cluster. This is a useful addition for integration tests that require dynamic configuration changes. The implementation appears to be well-structured, but there are a few areas that could be improved for clarity and robustness.
Summary of Findings
- Listener Management in Restart: The
restart
method captures and reapplies listener configurations. It's crucial to ensure this process is robust and handles various listener configurations correctly, especially in complex setups. - Socket Factory Reinitialization: The
restart
method reinitializes thesocketFactoryManager
. It's important to verify that this reinitialization doesn't lead to resource leaks or conflicts with existing connections. - Error Handling in Server Creation: The
restart
method includes error handling during the creation ofControllerServer
andBrokerServer
. It's important to ensure that these error handling mechanisms are comprehensive and prevent the cluster from entering an inconsistent state.
Merge Readiness
The changes introduce a valuable feature for testing Kafka clusters. However, there are a few areas that need to be addressed before merging. Specifically, the listener management and socket factory reinitialization in the restart
method should be carefully reviewed to ensure robustness and prevent potential issues. Additionally, the error handling mechanisms during server creation should be comprehensive. I am unable to approve this pull request, and recommend that it not be merged until the high severity issues are addressed and the other comments are considered. Please have others review and approve this code before merging.
|
||
Map<Integer, SharedServer> jointServers = new HashMap<>(); | ||
|
||
socketFactoryManager = new PreboundSocketFactoryManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reinitializing socketFactoryManager
here might lead to resource leaks if the old socket factories are not properly closed. Ensure that the old socket factories are closed before creating new ones.
if (socketFactoryManager != null) {
try {
socketFactoryManager.close();
} catch (Exception e) {
log.warn("Exception while closing socketFactoryManager: {}", e.getMessage(), e);
}
}
socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>()); | ||
Set<String> listeners = nodeIdToListeners.get(brokerId); | ||
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||
}); | ||
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||
broker.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting this logic for collecting listeners into a separate, well-named method to improve readability and maintainability. This would also reduce code duplication, as the same logic is repeated for brokers and controllers.
private Set<String> collectListeners(SharedServer server, int nodeId) {
Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
server.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!server.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(server.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
server.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
server.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
return listeners;
}
// Usage in shutdown method:
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
Set<String> listeners = collectListeners(broker.sharedServer(), brokerId);
nodeIdToListeners.put(brokerId, listeners);
} | ||
} | ||
|
||
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's crucial to ensure that the shutdown()
method is robust and handles all possible exceptions gracefully. Consider adding more specific exception handling and logging to identify and address any potential issues during shutdown.
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
try {
shutdown();
} catch (Exception e) {
log.error("Exception during shutdown: {}", e.getMessage(), e);
throw e; // Re-throw the exception to prevent restart
}
Map<String, Object> config = controller.config().originals(); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line uses String.join
to create a comma-separated string of listeners. Ensure that the nodeIdToListeners
map contains the correct listener information for each node before joining them. Also, consider adding a check to ensure that the listeners are not empty.
Set<String> listenersForNode = nodeIdToListeners.get(id);
if (listenersForNode == null || listenersForNode.isEmpty()) {
log.warn("No listeners found for node {}", id);
// Handle the case where there are no listeners, possibly by using a default listener
}
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listenersForNode != null ? listenersForNode : Collections.emptySet()));
} catch (Throwable e) { | ||
log.error("Error creating controller {}", node.id(), e); | ||
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling here uses Utils.swallow
. While this prevents the exception from propagating, it might mask important information about why the controller creation failed. Consider re-throwing a more specific exception or logging additional details to aid debugging.
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
try {
sharedServer.stopForController();
} catch (Throwable e2) {
log.warn("sharedServer.stopForController error", e2);
}
throw new RuntimeException("Error creating controller " + node.id(), e);
}
} catch (Throwable e) { | ||
log.error("Error creating broker {}", node.id(), e); | ||
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker); | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the controller creation, the error handling here uses Utils.swallow
. Consider re-throwing a more specific exception or logging additional details to aid debugging.
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
try {
sharedServer.stopForBroker();
} catch (Throwable e2) {
log.warn("sharedServer.stopForBroker error", e2);
}
throw new RuntimeException("Error creating broker " + node.id(), e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (2)
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1)
158-163
: Add documentation clarifying whenrestart()
may be invokedThe new convenience
restart()
method is excellent, but its lifecycle expectations are not obvious to users.
Without guidance, someone might callrestart()
before the cluster has ever been started, which will currently fail down-stream (see comments onKafkaClusterTestKit#shutdown
).A short Javadoc block like the following would avoid surprises:
+ /** + * Restart the cluster instance preserving existing listener ports. + * + * NOTE: The cluster must have been {@link #start() started} at least once, + * otherwise the underlying servers have no bound ports and restart will fail. + */ default void restart() throws Exception { restart(Map.of()); }test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)
335-346
: Replace magic broker-wide override key-1
with a named constantUsing a hard-coded
-1
to denote “apply to all servers” obscures intent and invites typos.- clusterInstance.restart(Collections.singletonMap(-1, Collections.singletonMap("default.replication.factor", 2))); + final int ALL_NODES = -1; // consider moving to ClusterInstance or a util + clusterInstance.restart(Collections.singletonMap( + ALL_NODES, + Collections.singletonMap("default.replication.factor", 2)));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
(3 hunks)test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
(1 hunks)test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
(1 hunks)test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
(1 hunks)
🔇 Additional comments (2)
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (1)
539-542
: InconsistentKafkaConfig
constructor usageControllers are rebuilt with
new KafkaConfig(config, false)
while brokers usenew KafkaConfig(config)
.
Pass the explicit two-arg form everywhere for consistency and to prevent unexpected logging:- KafkaConfig nodeConfig = new KafkaConfig(config); + KafkaConfig nodeConfig = new KafkaConfig(config, false);test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)
353-360
:getInt
may throwConfigException
– confirm property is parsed as int
default.replication.factor
is typed as short in Kafka configs.
KafkaConfig#getInt
will attempt integer parsing and can throw if the value does not fit in anint
.Please verify this passes on all supported JVMs; otherwise switch to
getShort
orconfig().originals().get(...)
with explicit cast.
@Override | ||
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception { | ||
clusterTestKit.restart(perServerConfigOverrides); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against restarting an instance that has never been started
If a test calls clusterInstance.restart()
before start()
, the underlying KafkaClusterTestKit#shutdown
tries to gather listener
information from SocketServer
acceptors that are not yet created, resulting in NPEs and an empty LISTENERS
config on restart.
Add a fast-fail guard:
@@
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
- clusterTestKit.restart(perServerConfigOverrides);
+ if (!started.get()) {
+ throw new IllegalStateException("Cannot restart a cluster that has not been started");
+ }
+ clusterTestKit.restart(perServerConfigOverrides);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@Override | |
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception { | |
clusterTestKit.restart(perServerConfigOverrides); | |
} | |
@Override | |
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception { | |
if (!started.get()) { | |
throw new IllegalStateException("Cannot restart a cluster that has not been started"); | |
} | |
clusterTestKit.restart(perServerConfigOverrides); | |
} |
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception { | ||
shutdown(); | ||
|
||
Map<Integer, SharedServer> jointServers = new HashMap<>(); | ||
|
||
socketFactoryManager = new PreboundSocketFactoryManager(); | ||
controllers.forEach((id, controller) -> { | ||
Map<String, Object> config = controller.config().originals(); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Port reservations are lost on restart – risk of “Address already in use”
restart()
creates a fresh PreboundSocketFactoryManager
, but the new manager is not told to re-use the previously
bound ports captured in nodeIdToListeners
.
If another process grabs one of those ports between shutdown and restart, the cluster start-up will fail.
Consider feeding the cached listener URIs back into the new manager before constructing servers, e.g.:
socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.forEach((id, listeners) ->
listeners.forEach(l -> socketFactoryManager.reserve(id, l)));
(or expose a helper in PreboundSocketFactoryManager
).
brokers.forEach((id, broker) -> { | ||
Map<String, Object> config = broker.config().originals(); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||
|
||
TestKitNode node = nodes.brokerNodes().get(id); | ||
KafkaConfig nodeConfig = new KafkaConfig(config); | ||
SharedServer sharedServer = jointServers.computeIfAbsent( | ||
node.id(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutating the live config map may have side-effects
controller.config().originals()
and broker.config().originals()
return the live backing map of the old
KafkaConfig
. Mutating it after shutdown is risky (if another thread still holds a reference) and obscures intent.
Use a defensive copy before modifications:
- Map<String, Object> config = broker.config().originals();
+ Map<String, Object> config = new HashMap<>(broker.config().originals());
Apply the same change to the controller block above.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
brokers.forEach((id, broker) -> { | |
Map<String, Object> config = broker.config().originals(); | |
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | |
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | |
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | |
TestKitNode node = nodes.brokerNodes().get(id); | |
KafkaConfig nodeConfig = new KafkaConfig(config); | |
SharedServer sharedServer = jointServers.computeIfAbsent( | |
node.id(), | |
brokers.forEach((id, broker) -> { | |
- Map<String, Object> config = broker.config().originals(); | |
+ Map<String, Object> config = new HashMap<>(broker.config().originals()); | |
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | |
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | |
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | |
TestKitNode node = nodes.brokerNodes().get(id); | |
KafkaConfig nodeConfig = new KafkaConfig(config); | |
SharedServer sharedServer = jointServers.computeIfAbsent( | |
node.id(), |
public void shutdown() throws Exception { | ||
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>(); | ||
try { | ||
// Note the shutdown order here is chosen to be consistent with | ||
// `KafkaRaftServer`. See comments in that class for an explanation. | ||
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) { | ||
int brokerId = entry.getKey(); | ||
BrokerServer broker = entry.getValue(); | ||
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>()); | ||
Set<String> listeners = nodeIdToListeners.get(brokerId); | ||
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||
}); | ||
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||
broker.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||
} | ||
nodeIdToListeners.put(brokerId, listeners); | ||
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId, | ||
executorService.submit((Runnable) broker::shutdown))); | ||
} | ||
waitForAllFutures(futureEntries); | ||
futureEntries.clear(); | ||
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) { | ||
int controllerId = entry.getKey(); | ||
ControllerServer controller = entry.getValue(); | ||
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>()); | ||
Set<String> listeners = nodeIdToListeners.get(controllerId); | ||
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||
}); | ||
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||
controller.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||
} | ||
nodeIdToListeners.put(controllerId, listeners); | ||
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, | ||
executorService.submit(controller::shutdown))); | ||
} | ||
waitForAllFutures(futureEntries); | ||
futureEntries.clear(); | ||
socketFactoryManager.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown()
assumes acceptors are present – restart will break if the cluster was never started
broker.socketServer().dataPlaneAcceptors()
and the control-plane accessor are empty until startup()
has bound the ports.
When they are empty, nodeIdToListeners.get(id)
remains null
, and the subsequent String.join(",", null)
in restart()
throws an NPE.
Either bail out early when the servers were never started, or initialise nodeIdToListeners
with the original listener strings from the config:
- nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
+ nodeIdToListeners
+ .computeIfAbsent(brokerId, __ ->
+ new HashSet<>(List.of(broker.config()
+ .originals()
+ .getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "")
+ .toString().split(","))));
A similar fix is needed for the controller loop below.
Without this, any test that formats the cluster but calls restart()
before startup()
will consistently fail.
In some test cases like
ListOffsetsIntegrationTest#testListVersion
, we need to update broker config and restart. We can add a helper function for it, so we don't need to reimplement in each case.Committer Checklist (excluded from commit message)
Summary by CodeRabbit