Skip to content

Commit

Permalink
Update Apache kafka kafka-clients version 2.3.1 #256
Browse files Browse the repository at this point in the history
Our internal kmf mp replies not on apache kafka, but linkedin/kafka which still to this date replies on the old copy - 2.3.*.
It makes more sense to use java kafka-clients 2.3.1 rather than 2.4.1 to better monitor our kafka clusters.
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'

Signed-off-by: Andrew Choi <[email protected]>
  • Loading branch information
Andrew Choi authored May 31, 2020
1 parent b4f9d9e commit 80a6ba0
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ allprojects {
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/linkedin/kmf/services/ConsumeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -241,8 +241,7 @@ public synchronized void start() {
@SuppressWarnings("ConstantConditions")
double partitionCount = topicDescription.partitions().size();
topicPartitionCount.add(
new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags),
new CumulativeSum(partitionCount));
new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), new Total(partitionCount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.ElectPreferredLeadersResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -481,13 +479,9 @@ private void triggerPreferredLeaderElection(List<TopicPartitionInfo> partitionIn
for (TopicPartitionInfo javaPartitionInfo : partitionInfoList) {
partitions.add(new TopicPartition(partitionTopic, javaPartitionInfo.partition()));
}
ElectPreferredLeadersResult electPreferredLeadersResult = _adminClient.electPreferredLeaders(partitions);

ElectLeadersOptions newOptions = new ElectLeadersOptions();
ElectionType electionType = ElectionType.PREFERRED;
Set<TopicPartition> topicPartitions = new HashSet<>(partitions);
ElectLeadersResult electLeadersResult = _adminClient.electLeaders(electionType, topicPartitions, newOptions);

LOGGER.info("{}: triggerPreferredLeaderElection - {}", this.getClass().toString(), electLeadersResult.all().get());
LOGGER.info("{}: triggerPreferredLeaderElection - {}", this.getClass().toString(), electPreferredLeadersResult.all().get());
}

private static void reassignPartitions(KafkaZkClient zkClient, Collection<Node> brokers, String topic, int partitionCount, int replicationFactor) {
Expand Down

0 comments on commit 80a6ba0

Please sign in to comment.