Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
tinaselenge committed Nov 20, 2024
1 parent ab3b3d6 commit d822b67
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3209,7 +3209,6 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers
final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true));
TestUtils.assertFutureThrows(result.nodes(),
UnsupportedVersionException.class, "Including fenced broker endpoints is not supported with version 1");

}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3613,8 +3613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection()
val describeClusterRequest = request.body[DescribeClusterRequest]
metadataCache.getBrokerNodes(request.context.listenerName).foreach { node =>
if (node.isFenced && !describeClusterRequest.data().includeFencedBrokers()) {
} else {
if (!node.isFenced || describeClusterRequest.data().includeFencedBrokers()) {
brokers.add(new DescribeClusterResponseData.DescribeClusterBroker().
setBrokerId(node.id).
setHost(node.host).
Expand Down
27 changes: 18 additions & 9 deletions tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -77,18 +80,24 @@ public void testListEndpointsWithBootstrapServer(ClusterInstance clusterInstance
int id = clusterInstance.brokerIds().iterator().next();
String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker");
assertTrue(output.equals(expected));
assertEquals(expected, output);
}

@ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT})
@ClusterTest(brokers = 2, types = {Type.KRAFT, Type.CO_KRAFT})
public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance clusterInstance) {
String output = ToolsTestUtils.captureStandardOut(() ->
assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers()), "--include-fenced-brokers"));
String port = clusterInstance.bootstrapServers().split(":")[1];
int id = clusterInstance.brokerIds().iterator().next();
String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker");
assertTrue(output.equals(expected));
List<Integer> brokerIds = clusterInstance.brokerIds().stream().collect(Collectors.toList());
clusterInstance.shutdownBroker(brokerIds.get(0));

List<String> ports = Arrays.stream(clusterInstance.bootstrapServers().split(",")).map(b -> b.split(":")[1]).collect(Collectors.toList());
String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
String expected = String.format(format,
"ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE",
brokerIds.get(0), "localhost", ports.get(0), "null", "fenced", "broker",
brokerIds.get(1), "localhost", ports.get(1), "null", "unfenced", "broker");

String output = ToolsTestUtils.captureStandardOut(() -> assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers(), "--include-fenced-brokers")));

assertEquals(expected, output);
}

@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
Expand Down

0 comments on commit d822b67

Please sign in to comment.