diff --git a/build.gradle b/build.gradle index c4c8d60c..7a58826a 100644 --- a/build.gradle +++ b/build.gradle @@ -38,8 +38,8 @@ allprojects { compile 'net.savantly:graphite-client:1.1.0-RELEASE' compile 'com.timgroup:java-statsd-client:3.0.1' compile 'com.signalfx.public:signalfx-codahale:0.0.47' - compile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.5.0' - compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0' + compile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.4.1' + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1' testCompile 'org.mockito:mockito-core:2.24.0' testCompile 'org.testng:testng:6.8.8' } diff --git a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java index ee8f5d47..9b7ad805 100644 --- a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java @@ -54,7 +54,6 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Option$; import scala.collection.Seq; @@ -388,9 +387,9 @@ private Set getAvailableBrokers() throws ExecutionException, InterruptedEx } void maybeReassignPartitionAndElectLeader() throws Exception { - try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSaslEnabled(), + try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS, com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, - Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null, Option.apply(null))) { + Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null)) { List partitionInfoList = _adminClient .describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions(); @@ -463,8 +462,8 @@ void maybeElectLeader() throws Exception { return; } - try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSaslEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS, - com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null, null)) { + try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS, + com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null)) { if (!zkClient.reassignPartitionsInProgress()) { List partitionInfoList = _adminClient .describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();