diff --git a/build.gradle b/build.gradle index ec106864..d1410b36 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,7 @@ allprojects { compile 'net.savantly:graphite-client:1.1.0-RELEASE' compile 'com.timgroup:java-statsd-client:3.0.1' compile 'com.signalfx.public:signalfx-codahale:0.0.47' - compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.1' + compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1' testCompile 'org.mockito:mockito-core:2.24.0' testCompile 'org.testng:testng:6.8.8' diff --git a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java index 9b7ad805..81349ec3 100644 --- a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import kafka.admin.AdminUtils; import kafka.admin.BrokerMetadata; -import kafka.controller.ReplicaAssignment; import kafka.server.ConfigType; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.admin.AdminClient; @@ -508,9 +507,9 @@ private static void reassignPartitions(KafkaZkClient zkClient, Collection } scala.collection.immutable.Set topicList = new scala.collection.immutable.Set.Set1<>(topic); - scala.collection.Map - currentAssignment = zkClient.getPartitionAssignmentForTopics(topicList).apply(topic); - String currentAssignmentJson = formatAsOldAssignmentJson(topic, currentAssignment); + scala.collection.Map> currentAssignment = + zkClient.getPartitionAssignmentForTopics(topicList).apply(topic); + String currentAssignmentJson = formatAsNewReassignmentJson(topic, currentAssignment); String newAssignmentJson = formatAsNewReassignmentJson(topic, assignedReplicas); LOGGER.info("Reassign partitions for topic " + topic); @@ -568,23 +567,25 @@ static boolean someBrokerNotElectedLeader(List partitionInfo * {"topic":"kmf-topic","partition":0,"replicas":[2,0]}]} * */ - private static String formatAsOldAssignmentJson(String topic, scala.collection.Map partitionsToBeReassigned) { - StringBuilder bldr = new StringBuilder(); - bldr.append("{\"version\":1,\"partitions\":[\n"); - for (int partition = 0; partition < partitionsToBeReassigned.size(); partition++) { - bldr.append(" {\"topic\":\"").append(topic).append("\",\"partition\":").append(partition).append(",\"replicas\":["); - ReplicaAssignment replicas = partitionsToBeReassigned.apply(partition); - for (int replicaIndex = 0; replicaIndex < replicas.replicas().size(); replicaIndex++) { - Object replica = replicas.replicas().apply(replicaIndex); - bldr.append(replica).append(","); - } - bldr.setLength(bldr.length() - 1); - bldr.append("]},\n"); - } - bldr.setLength(bldr.length() - 2); - bldr.append("]}"); - return bldr.toString(); - } + + // TODO (andrewchoi5): uncomment this method when Xinfra Monitor is upgraded to 'org.apache.kafka' 'kafka_2.12' version '2.4.1' +// private static String formatAsOldAssignmentJson(String topic, scala.collection.Map partitionsToBeReassigned) { +// StringBuilder bldr = new StringBuilder(); +// bldr.append("{\"version\":1,\"partitions\":[\n"); +// for (int partition = 0; partition < partitionsToBeReassigned.size(); partition++) { +// bldr.append(" {\"topic\":\"").append(topic).append("\",\"partition\":").append(partition).append(",\"replicas\":["); +// ReplicaAssignment replicas = partitionsToBeReassigned.apply(partition); +// for (int replicaIndex = 0; replicaIndex < replicas.replicas().size(); replicaIndex++) { +// Object replica = replicas.replicas().apply(replicaIndex); +// bldr.append(replica).append(","); +// } +// bldr.setLength(bldr.length() - 1); +// bldr.append("]},\n"); +// } +// bldr.setLength(bldr.length() - 2); +// bldr.append("]}"); +// return bldr.toString(); +// } /** * @param topic Kafka topic