Skip to content

KAFKA-17259: Support to override serverProperties and restart cluster in ClusterTestExtensions #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 26, 2025

In some test cases like ListOffsetsIntegrationTest#testListVersion, we need to update broker config and restart. We can add a helper function for it, so we don't need to reimplement in each case.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Summary by CodeRabbit

  • New Features
    • Added the ability to restart clusters with or without per-server configuration overrides.
  • Tests
    • Introduced new tests to verify cluster restart functionality and configuration override handling.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

Hi @FrankYang0529 ,

Please consider a case below.

  1. build a cluster
  2. Shutdown a broker
  3. Restart cluster.

Maybe cluster will get hanging, thanks.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

1 similar comment
@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

@FrankYang0529 could you please fix the conflicts

Copy link

coderabbitai bot commented Apr 26, 2025

Walkthrough

The changes introduce new restart and shutdown capabilities to the Kafka cluster test infrastructure. The KafkaClusterTestKit class gains public shutdown() and restart() methods, enabling asynchronous shutdown and configurable restart of clusters. The ClusterInstance interface and its implementation in RaftClusterInstance are extended to support cluster restarts with optional per-server configuration overrides. Two new tests are added to verify that cluster restarts correctly apply or preserve configuration overrides. Supporting fields and logic are introduced to cache listener addresses and manage socket factories during restarts.

Changes

File(s) Change Summary
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java Added public shutdown() and restart(Map) methods, new field for listener caching, and made socketFactoryManager mutable.
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java Added default and abstract restart() methods to the interface for standardized cluster restart capability.
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java Implemented the restart(Map) method in RaftClusterInstance, delegating to KafkaClusterTestKit.
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java Added two tests to verify cluster restart behavior with and without configuration overrides.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Method
    participant ClusterInstance
    participant RaftClusterInstance
    participant KafkaClusterTestKit

    Test->>ClusterInstance: restart(Map overrides)
    ClusterInstance->>RaftClusterInstance: restart(Map overrides)
    RaftClusterInstance->>KafkaClusterTestKit: restart(Map overrides)
    KafkaClusterTestKit->>KafkaClusterTestKit: shutdown()
    KafkaClusterTestKit->>KafkaClusterTestKit: Rebuild servers with overrides
    KafkaClusterTestKit->>KafkaClusterTestKit: startup()
Loading

Poem

Hopping through clusters, a rabbit at play,
Restart and shutdown, now part of the day!
With configs that change or quietly stay,
Tests keep us nimble, bugs kept at bay.
🐇✨ Cluster reborn, in a magical way!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

This pull request introduces a new restart method to the KafkaClusterTestKit class, enabling the overriding of server properties and restarting of the cluster within test extensions. This enhancement aims to simplify test cases that require updating broker configurations and restarting the cluster, avoiding redundant reimplementation of the same logic across multiple tests. The changes involve adding a restart method to the ClusterInstance interface and implementing it in RaftClusterInvocationContext and KafkaClusterTestKit. Additionally, the pull request includes new test cases to verify the functionality of the restart method with and without overridden configurations.

Highlights

  • New restart method: A restart method is added to the ClusterInstance interface and implemented in RaftClusterInvocationContext and KafkaClusterTestKit to allow restarting the cluster with overridden configurations.
  • Configuration Overrides: The restart method in KafkaClusterTestKit accepts a map of per-server configuration overrides, allowing specific configurations to be applied to individual brokers and controllers during restart.
  • Listener Management: The restart method captures the listeners before shutdown and reassigns them after restart to ensure proper reconnection.
  • Test Cases: New test cases are added to ClusterTestExtensionsTest to verify the restart method's behavior with and without configuration overrides.

Changelog

Click here to see the changelog
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
    • Added shutdown method to properly shutdown the cluster and capture listeners.
    • Added restart method to allow restarting the cluster with overridden configurations.
    • Modified the class to store listeners per node id.
    • Added logic to apply configuration overrides to brokers and controllers during restart.
    • Added logic to reassign listeners to brokers and controllers during restart.
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
    • Added a restart method with no arguments that calls the restart method with an empty map.
    • Added a restart method that accepts a map of per-server configuration overrides.
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
    • Implemented the restart method to delegate to the clusterTestKit.
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
    • Added a test case testRestartWithOverriddenConfig to verify the restart method with configuration overrides.
    • Added a test case testRestartWithoutOverriddenConfig to verify the restart method without configuration overrides.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


A cluster restarts with grace,
Configs changed, finding its place.
Brokers awaken,
No longer forsaken,
A new test run starts the race.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new restart method to the KafkaClusterTestKit and ClusterInstance interfaces, allowing for overriding server properties and restarting the cluster. This is a useful addition for integration tests that require dynamic configuration changes. The implementation appears to be well-structured, but there are a few areas that could be improved for clarity and robustness.

Summary of Findings

  • Listener Management in Restart: The restart method captures and reapplies listener configurations. It's crucial to ensure this process is robust and handles various listener configurations correctly, especially in complex setups.
  • Socket Factory Reinitialization: The restart method reinitializes the socketFactoryManager. It's important to verify that this reinitialization doesn't lead to resource leaks or conflicts with existing connections.
  • Error Handling in Server Creation: The restart method includes error handling during the creation of ControllerServer and BrokerServer. It's important to ensure that these error handling mechanisms are comprehensive and prevent the cluster from entering an inconsistent state.

Merge Readiness

The changes introduce a valuable feature for testing Kafka clusters. However, there are a few areas that need to be addressed before merging. Specifically, the listener management and socket factory reinitialization in the restart method should be carefully reviewed to ensure robustness and prevent potential issues. Additionally, the error handling mechanisms during server creation should be comprehensive. I am unable to approve this pull request, and recommend that it not be merged until the high severity issues are addressed and the other comments are considered. Please have others review and approve this code before merging.


Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reinitializing socketFactoryManager here might lead to resource leaks if the old socket factories are not properly closed. Ensure that the old socket factories are closed before creating new ones.

        if (socketFactoryManager != null) {
            try {
                socketFactoryManager.close();
            } catch (Exception e) {
                log.warn("Exception while closing socketFactoryManager: {}", e.getMessage(), e);
            }
        }
        socketFactoryManager = new PreboundSocketFactoryManager();

Comment on lines +451 to +460
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider extracting this logic for collecting listeners into a separate, well-named method to improve readability and maintainability. This would also reduce code duplication, as the same logic is repeated for brokers and controllers.

private Set<String> collectListeners(SharedServer server, int nodeId) {
    Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
    server.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
        listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
    });
    if (!server.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
        listeners.add(server.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
            server.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
            server.socketServer().controlPlaneAcceptorOpt().get().localPort());
    }
    return listeners;
}

// Usage in shutdown method:
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
Set<String> listeners = collectListeners(broker.sharedServer(), brokerId);
nodeIdToListeners.put(brokerId, listeners);

}
}

public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's crucial to ensure that the shutdown() method is robust and handles all possible exceptions gracefully. Consider adding more specific exception handling and logging to identify and address any potential issues during shutdown.

    public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
        try {
            shutdown();
        } catch (Exception e) {
            log.error("Exception during shutdown: {}", e.getMessage(), e);
            throw e; // Re-throw the exception to prevent restart
        }

Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line uses String.join to create a comma-separated string of listeners. Ensure that the nodeIdToListeners map contains the correct listener information for each node before joining them. Also, consider adding a check to ensure that the listeners are not empty.

            Set<String> listenersForNode = nodeIdToListeners.get(id);
            if (listenersForNode == null || listenersForNode.isEmpty()) {
                log.warn("No listeners found for node {}", id);
                // Handle the case where there are no listeners, possibly by using a default listener
            }
            config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listenersForNode != null ? listenersForNode : Collections.emptySet()));

Comment on lines +524 to +527
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
throw e;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error handling here uses Utils.swallow. While this prevents the exception from propagating, it might mask important information about why the controller creation failed. Consider re-throwing a more specific exception or logging additional details to aid debugging.

            } catch (Throwable e) {
                log.error("Error creating controller {}", node.id(), e);
                try {
                    sharedServer.stopForController();
                } catch (Throwable e2) {
                    log.warn("sharedServer.stopForController error", e2);
                }
                throw new RuntimeException("Error creating controller " + node.id(), e);
            }

Comment on lines +556 to +559
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
throw e;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the controller creation, the error handling here uses Utils.swallow. Consider re-throwing a more specific exception or logging additional details to aid debugging.

            } catch (Throwable e) {
                log.error("Error creating broker {}", node.id(), e);
                try {
                    sharedServer.stopForBroker();
                } catch (Throwable e2) {
                    log.warn("sharedServer.stopForBroker error", e2);
                }
                throw new RuntimeException("Error creating broker " + node.id(), e);
            }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1)

158-163: Add documentation clarifying when restart() may be invoked

The new convenience restart() method is excellent, but its lifecycle expectations are not obvious to users.
Without guidance, someone might call restart() before the cluster has ever been started, which will currently fail down-stream (see comments on KafkaClusterTestKit#shutdown).

A short Javadoc block like the following would avoid surprises:

+    /**
+     * Restart the cluster instance preserving existing listener ports.
+     *
+     * NOTE: The cluster must have been {@link #start() started} at least once,
+     * otherwise the underlying servers have no bound ports and restart will fail.
+     */
     default void restart() throws Exception {
         restart(Map.of());
     }
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)

335-346: Replace magic broker-wide override key -1 with a named constant

Using a hard-coded -1 to denote “apply to all servers” obscures intent and invites typos.

-        clusterInstance.restart(Collections.singletonMap(-1, Collections.singletonMap("default.replication.factor", 2)));
+        final int ALL_NODES = -1; // consider moving to ClusterInstance or a util
+        clusterInstance.restart(Collections.singletonMap(
+                ALL_NODES,
+                Collections.singletonMap("default.replication.factor", 2)));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b8b3d0f.

📒 Files selected for processing (4)
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (3 hunks)
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1 hunks)
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java (1 hunks)
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1 hunks)
🔇 Additional comments (2)
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (1)

539-542: Inconsistent KafkaConfig constructor usage

Controllers are rebuilt with new KafkaConfig(config, false) while brokers use new KafkaConfig(config).
Pass the explicit two-arg form everywhere for consistency and to prevent unexpected logging:

-            KafkaConfig nodeConfig = new KafkaConfig(config);
+            KafkaConfig nodeConfig = new KafkaConfig(config, false);
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)

353-360: getInt may throw ConfigException – confirm property is parsed as int

default.replication.factor is typed as short in Kafka configs.
KafkaConfig#getInt will attempt integer parsing and can throw if the value does not fit in an int.

Please verify this passes on all supported JVMs; otherwise switch to getShort or config().originals().get(...) with explicit cast.

Comment on lines +196 to +199
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
clusterTestKit.restart(perServerConfigOverrides);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against restarting an instance that has never been started

If a test calls clusterInstance.restart() before start(), the underlying KafkaClusterTestKit#shutdown tries to gather listener
information from SocketServer acceptors that are not yet created, resulting in NPEs and an empty LISTENERS config on restart.

Add a fast-fail guard:

@@
     @Override
     public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
-        clusterTestKit.restart(perServerConfigOverrides);
+        if (!started.get()) {
+            throw new IllegalStateException("Cannot restart a cluster that has not been started");
+        }
+        clusterTestKit.restart(perServerConfigOverrides);
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
clusterTestKit.restart(perServerConfigOverrides);
}
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
if (!started.get()) {
throw new IllegalStateException("Cannot restart a cluster that has not been started");
}
clusterTestKit.restart(perServerConfigOverrides);
}

Comment on lines +495 to +506
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();

Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();
controllers.forEach((id, controller) -> {
Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Port reservations are lost on restart – risk of “Address already in use”

restart() creates a fresh PreboundSocketFactoryManager, but the new manager is not told to re-use the previously
bound ports captured in nodeIdToListeners.
If another process grabs one of those ports between shutdown and restart, the cluster start-up will fail.

Consider feeding the cached listener URIs back into the new manager before constructing servers, e.g.:

socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.forEach((id, listeners) ->
        listeners.forEach(l -> socketFactoryManager.reserve(id, l)));

(or expose a helper in PreboundSocketFactoryManager).

Comment on lines +533 to +542
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Mutating the live config map may have side-effects

controller.config().originals() and broker.config().originals() return the live backing map of the old
KafkaConfig. Mutating it after shutdown is risky (if another thread still holds a reference) and obscures intent.

Use a defensive copy before modifications:

-            Map<String, Object> config = broker.config().originals();
+            Map<String, Object> config = new HashMap<>(broker.config().originals());

Apply the same change to the controller block above.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
brokers.forEach((id, broker) -> {
- Map<String, Object> config = broker.config().originals();
+ Map<String, Object> config = new HashMap<>(broker.config().originals());
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),

Comment on lines +443 to +486
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

shutdown() assumes acceptors are present – restart will break if the cluster was never started

broker.socketServer().dataPlaneAcceptors() and the control-plane accessor are empty until startup() has bound the ports.
When they are empty, nodeIdToListeners.get(id) remains null, and the subsequent String.join(",", null) in restart() throws an NPE.

Either bail out early when the servers were never started, or initialise nodeIdToListeners with the original listener strings from the config:

-                nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
+                nodeIdToListeners
+                        .computeIfAbsent(brokerId, __ ->
+                                new HashSet<>(List.of(broker.config()
+                                    .originals()
+                                    .getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "")
+                                    .toString().split(","))));

A similar fix is needed for the controller loop below.
Without this, any test that formats the cluster but calls restart() before startup() will consistently fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants