From 80a6ba0ff6eab4706a3c2deb4ecbf076d1d1c543 Mon Sep 17 00:00:00 2001 From: Andrew Choi Date: Sun, 31 May 2020 11:48:26 -0700 Subject: [PATCH] Update Apache kafka kafka-clients version 2.3.1 #256 Our internal kmf mp replies not on apache kafka, but linkedin/kafka which still to this date replies on the old copy - 2.3.*. It makes more sense to use java kafka-clients 2.3.1 rather than 2.4.1 to better monitor our kafka clusters. compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1' Signed-off-by: Andrew Choi --- build.gradle | 2 +- .../com/linkedin/kmf/services/ConsumeService.java | 5 ++--- .../services/MultiClusterTopicManagementService.java | 12 +++--------- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index d1410b36..091bbf0f 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ allprojects { compile 'com.timgroup:java-statsd-client:3.0.1' compile 'com.signalfx.public:signalfx-codahale:0.0.47' compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1' - compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1' + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1' testCompile 'org.mockito:mockito-core:2.24.0' testCompile 'org.testng:testng:6.8.8' } diff --git a/src/main/java/com/linkedin/kmf/services/ConsumeService.java b/src/main/java/com/linkedin/kmf/services/ConsumeService.java index 1a6535d8..307dffab 100644 --- a/src/main/java/com/linkedin/kmf/services/ConsumeService.java +++ b/src/main/java/com/linkedin/kmf/services/ConsumeService.java @@ -37,7 +37,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.utils.SystemTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -241,8 +241,7 @@ public synchronized void start() { @SuppressWarnings("ConstantConditions") double partitionCount = topicDescription.partitions().size(); topicPartitionCount.add( - new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), - new CumulativeSum(partitionCount)); + new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), new Total(partitionCount)); } } diff --git a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java index 81349ec3..d2eb667d 100644 --- a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java @@ -37,12 +37,10 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreatePartitionsResult; -import org.apache.kafka.clients.admin.ElectLeadersOptions; -import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.clients.admin.ElectPreferredLeadersResult; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; @@ -481,13 +479,9 @@ private void triggerPreferredLeaderElection(List partitionIn for (TopicPartitionInfo javaPartitionInfo : partitionInfoList) { partitions.add(new TopicPartition(partitionTopic, javaPartitionInfo.partition())); } + ElectPreferredLeadersResult electPreferredLeadersResult = _adminClient.electPreferredLeaders(partitions); - ElectLeadersOptions newOptions = new ElectLeadersOptions(); - ElectionType electionType = ElectionType.PREFERRED; - Set topicPartitions = new HashSet<>(partitions); - ElectLeadersResult electLeadersResult = _adminClient.electLeaders(electionType, topicPartitions, newOptions); - - LOGGER.info("{}: triggerPreferredLeaderElection - {}", this.getClass().toString(), electLeadersResult.all().get()); + LOGGER.info("{}: triggerPreferredLeaderElection - {}", this.getClass().toString(), electPreferredLeadersResult.all().get()); } private static void reassignPartitions(KafkaZkClient zkClient, Collection brokers, String topic, int partitionCount, int replicationFactor) {