Skip to content

Commit

Permalink
Use of org.apache.kafka - version: 2.3.1 #255
Browse files Browse the repository at this point in the history
Use of org.apache.kafka - version: 2.3.1

Justification:

Current linkedin kafka repository, "kafka": "com.linkedin.kafka:kafka_2.12:2.3.0.20", which internal kmf mp depends on, still uses version 2.3.0.20.
This linkedin kafka version uses def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] as its parameters for the method getPartitionAssignmentForTopics.
However, the compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.1' uses def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, ReplicaAssignment]]
Thus, apache kafka 2.4.1 is not backward compatible with internal kmf MP and linkedin kafka inside kmf MP. -> there is a discrepancy in method definition of linkedin kafka versus apache kafka.
Solution:
With the older version of apache kafka 2.3.1, there is no discrepancy between the two products.
I have checked that there is no breaking changes when reverting back to apache kafka version 2.3.1.

I have added a todo item (andrewchoi5) to uncomment the related method when Xinfra Monitor is upgraded to 'org.apache.kafka' 'kafka_2.12' version '2.4.1' later, at which point we expect the linkedin kafka to have the parameters for getPartitionAssignmentForTopics updated.

/**
   * Gets partition the assignments for the given topics.
   * @param topics the topics whose partitions we wish to get the assignments for.
   * @return the partition assignment for each partition from the given topics.
   */
  def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
    getDataResponses.flatMap { getDataResponse =>
      val topic = getDataResponse.ctx.get.asInstanceOf[String]
       if (getDataResponse.resultCode == Code.OK) {
        val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
        Map(topic -> partitionMap)
      } else if (getDataResponse.resultCode == Code.NONODE) {
        Map.empty[String, Map[Int, Seq[Int]]]
      } else {
        throw getDataResponse.resultException.get
      }
    }.toMap
  }
Signed-off-by: Andrew Choi <[email protected]>
  • Loading branch information
Andrew Choi authored May 31, 2020
1 parent 31efd33 commit b4f9d9e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ allprojects {
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.1'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.controller.ReplicaAssignment;
import kafka.server.ConfigType;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -508,9 +507,9 @@ private static void reassignPartitions(KafkaZkClient zkClient, Collection<Node>
}

scala.collection.immutable.Set<String> topicList = new scala.collection.immutable.Set.Set1<>(topic);
scala.collection.Map<Object, ReplicaAssignment>
currentAssignment = zkClient.getPartitionAssignmentForTopics(topicList).apply(topic);
String currentAssignmentJson = formatAsOldAssignmentJson(topic, currentAssignment);
scala.collection.Map<Object, scala.collection.Seq<Object>> currentAssignment =
zkClient.getPartitionAssignmentForTopics(topicList).apply(topic);
String currentAssignmentJson = formatAsNewReassignmentJson(topic, currentAssignment);
String newAssignmentJson = formatAsNewReassignmentJson(topic, assignedReplicas);

LOGGER.info("Reassign partitions for topic " + topic);
Expand Down Expand Up @@ -568,23 +567,25 @@ static boolean someBrokerNotElectedLeader(List<TopicPartitionInfo> partitionInfo
* {"topic":"kmf-topic","partition":0,"replicas":[2,0]}]}
* </pre>
*/
private static String formatAsOldAssignmentJson(String topic, scala.collection.Map<Object, ReplicaAssignment> partitionsToBeReassigned) {
StringBuilder bldr = new StringBuilder();
bldr.append("{\"version\":1,\"partitions\":[\n");
for (int partition = 0; partition < partitionsToBeReassigned.size(); partition++) {
bldr.append(" {\"topic\":\"").append(topic).append("\",\"partition\":").append(partition).append(",\"replicas\":[");
ReplicaAssignment replicas = partitionsToBeReassigned.apply(partition);
for (int replicaIndex = 0; replicaIndex < replicas.replicas().size(); replicaIndex++) {
Object replica = replicas.replicas().apply(replicaIndex);
bldr.append(replica).append(",");
}
bldr.setLength(bldr.length() - 1);
bldr.append("]},\n");
}
bldr.setLength(bldr.length() - 2);
bldr.append("]}");
return bldr.toString();
}

// TODO (andrewchoi5): uncomment this method when Xinfra Monitor is upgraded to 'org.apache.kafka' 'kafka_2.12' version '2.4.1'
// private static String formatAsOldAssignmentJson(String topic, scala.collection.Map<Object, ReplicaAssignment> partitionsToBeReassigned) {
// StringBuilder bldr = new StringBuilder();
// bldr.append("{\"version\":1,\"partitions\":[\n");
// for (int partition = 0; partition < partitionsToBeReassigned.size(); partition++) {
// bldr.append(" {\"topic\":\"").append(topic).append("\",\"partition\":").append(partition).append(",\"replicas\":[");
// ReplicaAssignment replicas = partitionsToBeReassigned.apply(partition);
// for (int replicaIndex = 0; replicaIndex < replicas.replicas().size(); replicaIndex++) {
// Object replica = replicas.replicas().apply(replicaIndex);
// bldr.append(replica).append(",");
// }
// bldr.setLength(bldr.length() - 1);
// bldr.append("]},\n");
// }
// bldr.setLength(bldr.length() - 2);
// bldr.append("]}");
// return bldr.toString();
// }

/**
* @param topic Kafka topic
Expand Down

0 comments on commit b4f9d9e

Please sign in to comment.