From d1edf726698c22a20c4ab2b451be096f0582de8d Mon Sep 17 00:00:00 2001 From: Anton Kirillov Date: Mon, 13 Jun 2016 13:53:51 +0200 Subject: [PATCH] initial migration to Kafka 0.10 --- build.gradle | 9 ++-- gradle/wrapper/gradle-wrapper.properties | 3 +- .../ly/stealth/mesos/kafka/Cluster.scala | 10 ++-- src/scala/ly/stealth/mesos/kafka/Expr.scala | 8 ++-- .../ly/stealth/mesos/kafka/Migration.scala | 6 +-- .../ly/stealth/mesos/kafka/Rebalancer.scala | 47 +++++++++---------- .../ly/stealth/mesos/kafka/Scheduler.scala | 4 +- src/scala/ly/stealth/mesos/kafka/Topics.scala | 32 ++++++------- .../stealth/mesos/kafka/RebalancerTest.scala | 25 +++++++--- 9 files changed, 79 insertions(+), 65 deletions(-) diff --git a/build.gradle b/build.gradle index 27c7553..4a8a70c 100644 --- a/build.gradle +++ b/build.gradle @@ -10,9 +10,10 @@ version = new File('src/scala/ly/stealth/mesos/kafka/Scheduler.scala').readLines jar.archiveName = "kafka-mesos-${version}.jar" -def mesosVersion="0.25.0" -def kafkaVersion="0.8.1.1" -def kafkaScalaVersion="2.10" +def mesosVersion="0.28.1" +def kafkaVersion="0.10.0.0" +def kafkaScalaVersion="2.11" +def scalaVersion="2.11.6" repositories { mavenCentral() @@ -34,7 +35,7 @@ sourceSets { def jettyVersion = "9.0.4.v20130625" dependencies { - compile "org.scala-lang:scala-library:2.10.6" + compile "org.scala-lang:scala-library:$scalaVersion" compile "org.apache.mesos:mesos:$mesosVersion" compile name: "util-mesos-0.1.0.0" compile "com.google.protobuf:protobuf-java:2.5.0" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac5bd45..23de0ac 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Mon Jun 13 09:11:33 CEST 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip diff --git a/src/scala/ly/stealth/mesos/kafka/Cluster.scala b/src/scala/ly/stealth/mesos/kafka/Cluster.scala index 6806eff..75fa6b8 100644 --- a/src/scala/ly/stealth/mesos/kafka/Cluster.scala +++ b/src/scala/ly/stealth/mesos/kafka/Cluster.scala @@ -18,14 +18,16 @@ package ly.stealth.mesos.kafka import java.util + import scala.util.parsing.json.{JSONArray, JSONObject} import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ListBuffer import java.util.Collections -import java.io.{FileWriter, File} +import java.io.{File, FileWriter} + import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZKStringSerializer +import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.exception.ZkNodeExistsException import net.elodina.mesos.util.Version @@ -136,7 +138,7 @@ object Cluster { class ZkStorage(val path: String) extends Storage { createChrootIfRequired() - def zkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + def zkClient: ZkClient = ZkUtils.createZkClient(Config.zk, 30000, 30000) private def createChrootIfRequired(): Unit = { val slashIdx: Int = Config.zk.indexOf('/') @@ -145,7 +147,7 @@ object Cluster { val chroot = Config.zk.substring(slashIdx) val zkConnect = Config.zk.substring(0, slashIdx) - val client = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val client = ZkUtils.createZkClient(Config.zk, 30000, 30000) try { client.createPersistent(chroot, true) } finally { client.close() } } diff --git a/src/scala/ly/stealth/mesos/kafka/Expr.scala b/src/scala/ly/stealth/mesos/kafka/Expr.scala index 03e566a..817e3b1 100644 --- a/src/scala/ly/stealth/mesos/kafka/Expr.scala +++ b/src/scala/ly/stealth/mesos/kafka/Expr.scala @@ -165,10 +165,10 @@ object Expr { def expandTopics(expr: String): util.List[String] = { val topics = new util.TreeSet[String]() - val zkClient = newZkClient + val zkUtils = newZkUtils var allTopics: util.List[String] = null - try { allTopics = ZkUtils.getAllTopics(zkClient) } - finally { zkClient.close() } + try { allTopics = zkUtils.getAllTopics() } + finally { zkUtils.close() } for (part <- expr.split(",").map(_.trim).filter(!_.isEmpty)) { if (!part.endsWith("*")) topics.add(part) @@ -189,5 +189,5 @@ object Expr { out.println(" t* - topics starting with 't'") } - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled } diff --git a/src/scala/ly/stealth/mesos/kafka/Migration.scala b/src/scala/ly/stealth/mesos/kafka/Migration.scala index fcefc81..a183ef3 100644 --- a/src/scala/ly/stealth/mesos/kafka/Migration.scala +++ b/src/scala/ly/stealth/mesos/kafka/Migration.scala @@ -16,7 +16,7 @@ object Migration { private val values: util.TreeMap[Version, Migration] = new util.TreeMap[Version, Migration]() private def add(m: Migration): Unit = { values.put(m.version, m); } - add(M_0_9_5_1) + add(M_0_10_0_0) def all: util.List[Migration] = new util.ArrayList(values.values()) @@ -31,8 +31,8 @@ object Migration { result } - object M_0_9_5_1 extends Migration { - def version: Version = new Version("0.9.5.1") + object M_0_10_0_0 extends Migration { + def version: Version = new Version("0.10.0.0") def apply(_json: Map[String, Object]): Map[String, Object] = { val json = new mutable.HashMap[String, Object]() ++ _json diff --git a/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala b/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala index 156a637..70aeccd 100644 --- a/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala +++ b/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala @@ -21,14 +21,12 @@ import java.util import scala.Some import scala.collection.JavaConversions._ -import scala.collection.{mutable, Seq, Map} - +import scala.collection.{Map, Seq, mutable} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException - import kafka.admin._ -import kafka.common.TopicAndPartition -import kafka.utils.{ZkUtils, ZKStringSerializer} +import kafka.common.{AdminCommandFailedException, TopicAndPartition} +import kafka.utils.{ZKStringSerializer, ZkUtils} import net.elodina.mesos.util.Period import org.apache.log4j.Logger @@ -38,10 +36,10 @@ class Rebalancer { @volatile private var assignment: Map[TopicAndPartition, Seq[Int]] = null @volatile private var reassignment: Map[TopicAndPartition, Seq[Int]] = null - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled def running: Boolean = { - val zkClient = newZkClient + val zkClient = newZkUtils.zkClient try { zkClient.exists(ZkUtils.ReassignPartitionsPath) } finally { zkClient.close() } } @@ -50,16 +48,17 @@ class Rebalancer { if (topics.isEmpty) throw new Rebalancer.Exception("no topics") logger.info(s"Starting rebalance for topics ${topics.mkString(",")} on brokers ${brokers.mkString(",")} with ${if (replicas == -1) "" else replicas} replicas") - val zkClient = newZkClient + val zkUtils = newZkUtils try { - val assignment: Map[TopicAndPartition, Seq[Int]] = ZkUtils.getReplicaAssignmentForTopics(zkClient, topics) - val reassignment: Map[TopicAndPartition, Seq[Int]] = getReassignments(brokers.map(Integer.parseInt), topics, assignment, replicas) + val assignment: Map[TopicAndPartition, Seq[Int]] = zkUtils.getReplicaAssignmentForTopics(topics) + val reassignment: Map[TopicAndPartition, Seq[Int]] = getReassignments(brokers.map(b => BrokerMetadata(b.toInt, None)), topics, assignment, replicas) + + reassignPartitions(zkUtils, reassignment) - reassignPartitions(zkClient, reassignment) this.reassignment = reassignment this.assignment = assignment } finally { - zkClient.close() + zkUtils.close() } } @@ -67,9 +66,9 @@ class Rebalancer { if (assignment == null) return "" var s = "" - val zkClient = newZkClient + val zkUtils = newZkUtils try { - val reassigning: Map[TopicAndPartition, Seq[Int]] = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + val reassigning: Map[TopicAndPartition, Seq[Int]] = zkUtils.getPartitionsBeingReassigned.mapValues(_.newReplicas) val byTopic: Map[String, Map[TopicAndPartition, Seq[Int]]] = assignment.groupBy(tp => tp._1.topic) for (topic <- byTopic.keys.to[List].sorted) { @@ -81,12 +80,12 @@ class Rebalancer { s += " " + topicAndPartition.partition + ": " + brokers.mkString(",") s += " -> " s += reassignment.getOrElse(topicAndPartition, null).mkString(",") - s += " - " + getReassignmentState(zkClient, topicAndPartition, reassigning) + s += " - " + getReassignmentState(zkUtils, topicAndPartition, reassigning) s += "\n" } } } finally { - zkClient.close() + zkUtils.close() } s @@ -105,7 +104,7 @@ class Rebalancer { matches } - private def getReassignments(brokerIds: Seq[Int], topics: util.List[String], assignment: Map[TopicAndPartition, Seq[Int]], replicas: Int = -1): Map[TopicAndPartition, Seq[Int]] = { + private def getReassignments(brokerMetadata: Seq[BrokerMetadata], topics: util.List[String], assignment: Map[TopicAndPartition, Seq[Int]], replicas: Int = -1): Map[TopicAndPartition, Seq[Int]] = { var reassignment : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() val byTopic: Map[String, Map[TopicAndPartition, Seq[Int]]] = assignment.groupBy(tp => tp._1.topic) @@ -114,32 +113,32 @@ class Rebalancer { val rf: Int = if (replicas != -1) replicas else entry._2.valuesIterator.next().size val partitions: Int = entry._2.size - val assignedReplicas: Map[Int, Seq[Int]] = AdminUtils.assignReplicasToBrokers(brokerIds, partitions, rf, 0, 0) + val assignedReplicas: Map[Int, Seq[Int]] = AdminUtils.assignReplicasToBrokers(brokerMetadata, partitions, rf, 0, 0) reassignment ++= assignedReplicas.map(replicaEntry => TopicAndPartition(topic, replicaEntry._1) -> replicaEntry._2) } reassignment.toMap } - private def getReassignmentState(zkClient: ZkClient, topicAndPartition: TopicAndPartition, reassigning: Map[TopicAndPartition, Seq[Int]]): String = { + private def getReassignmentState(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition, reassigning: Map[TopicAndPartition, Seq[Int]]): String = { reassigning.get(topicAndPartition) match { case Some(partition) => "running" case None => // check if the current replica assignment matches the expected one after reassignment val assignedBrokers = reassignment(topicAndPartition) - val brokers = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) + val brokers = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition) if(brokers.sorted == assignedBrokers.sorted) "done" else "error" } } - private def reassignPartitions(zkClient: ZkClient, partitions: Map[TopicAndPartition, Seq[Int]]): Unit = { + private def reassignPartitions(zkUtils: ZkUtils, partitions: Map[TopicAndPartition, Seq[Int]]): Unit = { try { - val json = ZkUtils.getPartitionReassignmentZkData(partitions) - ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, json) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitions) + reassignPartitionsCommand.reassignPartitions() } catch { - case ze: ZkNodeExistsException => throw new Rebalancer.Exception("rebalance is in progress") + case ze: AdminCommandFailedException => throw new Rebalancer.Exception(ze.getMessage) } } } diff --git a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala index 8adb4d2..7e51eaa 100644 --- a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala +++ b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala @@ -33,7 +33,7 @@ import scala.collection.mutable object Scheduler extends org.apache.mesos.Scheduler { private val logger: Logger = Logger.getLogger(this.getClass) - val version: Version = new Version("0.9.5.1") + val version: Version = new Version("0.10.0.0") val cluster: Cluster = new Cluster() private var driver: SchedulerDriver = null @@ -419,7 +419,7 @@ object Scheduler extends org.apache.mesos.Scheduler { credsBuilder = Credential.newBuilder() credsBuilder.setPrincipal(Config.principal) - credsBuilder.setSecret(ByteString.copyFromUtf8(Config.secret)) + credsBuilder.setSecret(Config.secret) } val driver = diff --git a/src/scala/ly/stealth/mesos/kafka/Topics.scala b/src/scala/ly/stealth/mesos/kafka/Topics.scala index d2ad99e..078bcb0 100644 --- a/src/scala/ly/stealth/mesos/kafka/Topics.scala +++ b/src/scala/ly/stealth/mesos/kafka/Topics.scala @@ -32,7 +32,7 @@ import ly.stealth.mesos.kafka.Topics.Topic import kafka.log.LogConfig class Topics { - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled def getTopic(name: String): Topics.Topic = { if (name == null) return null @@ -41,13 +41,13 @@ class Topics { } def getTopics: util.List[Topics.Topic] = { - val zkClient = newZkClient + val zkUtils = newZkUtils try { - var names = ZkUtils.getAllTopics(zkClient) + val names = zkUtils.getAllTopics - val assignments: mutable.Map[String, Map[Int, Seq[Int]]] = ZkUtils.getPartitionAssignmentForTopics(zkClient, names) - val configs = AdminUtils.fetchAllTopicConfigs(zkClient) + val assignments: mutable.Map[String, Map[Int, Seq[Int]]] = zkUtils.getPartitionAssignmentForTopics(names) + val configs = AdminUtils.fetchAllTopicConfigs(zkUtils) val topics = new util.ArrayList[Topics.Topic] for (name <- names.sorted) @@ -59,7 +59,7 @@ class Topics { topics } finally { - zkClient.close() + zkUtils.close() } } @@ -67,12 +67,12 @@ class Topics { var brokers_ = brokers if (brokers_ == null) { - val zkClient = newZkClient - try { brokers_ = ZkUtils.getSortedBrokerList(zkClient)} - finally { zkClient.close() } + val zkUtils = newZkUtils + try { brokers_ = zkUtils.getSortedBrokerList()} + finally { zkUtils.close() } } - AdminUtils.assignReplicasToBrokers(brokers_, partitions, replicas, 0, 0).mapValues(new util.ArrayList[Int](_)) + AdminUtils.assignReplicasToBrokers(brokers_.map(b => BrokerMetadata(b, None)), partitions, replicas, 0, 0).mapValues(new util.ArrayList[Int](_)) } def addTopic(name: String, assignment: util.Map[Int, util.List[Int]] = null, options: util.Map[String, String] = null): Topic = { @@ -83,9 +83,9 @@ class Topics { if (options != null) for ((k, v) <- options) config.setProperty(k, v) - val zkClient = newZkClient - try { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, name, assignment_.mapValues(_.toList), config) } - finally { zkClient.close() } + val zkUtils = newZkUtils + try { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, name, assignment_.mapValues(_.toList), config) } + finally { zkUtils.close() } getTopic(name) } @@ -94,9 +94,9 @@ class Topics { val config: Properties = new Properties() for ((k, v) <- options) config.setProperty(k, v) - val zkClient = newZkClient - try { AdminUtils.changeTopicConfig(zkClient, topic.name, config) } - finally { zkClient.close() } + val zkUtils = newZkUtils + try { AdminUtils.changeTopicConfig(zkUtils, topic.name, config) } + finally { zkUtils.close() } } def validateOptions(options: util.Map[String, String]): String = { diff --git a/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala b/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala index ba8545d..f517fdd 100644 --- a/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala @@ -17,16 +17,20 @@ package ly.stealth.mesos.kafka -import org.junit.{Test, After, Before} +import org.junit.{After, Before, Test} import org.junit.Assert._ import org.I0Itec.zkclient.ZkClient import kafka.utils.{ZKStringSerializer, ZkUtils} + import scala.collection.JavaConversions._ import java.util +import kafka.common.TopicAndPartition +import org.apache.log4j.Logger + class RebalancerTest extends KafkaMesosTestCase { var rebalancer: Rebalancer = null - var zkClient: ZkClient = null + var zkUtils: ZkUtils = null @Before override def before { @@ -37,8 +41,7 @@ class RebalancerTest extends KafkaMesosTestCase { Config.zk = s"localhost:$port" startZkServer() - zkClient = zkServer.getZkClient - zkClient.setZkSerializer(ZKStringSerializer) + zkUtils = ZkUtils(Config.zk, 3000, 3000, isZkSecurityEnabled = false) } @After @@ -64,9 +67,17 @@ class RebalancerTest extends KafkaMesosTestCase { @Test def start_in_progress { Scheduler.cluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0))) - ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, "") - try { rebalancer.start(util.Arrays.asList("t1"), util.Arrays.asList("0", "1")); fail() } - catch { case e: Rebalancer.Exception => assertTrue(e.getMessage, e.getMessage.contains("in progress")) } + val partitionsReassignmentData = Map(TopicAndPartition("topic", 0) -> Seq(0,1)) + val jsonReassignmentData = zkUtils.formatAsReassignmentJson(partitionsReassignmentData) + zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData) + + try { + rebalancer.start(util.Arrays.asList("topic"), util.Arrays.asList("0", "1")) + fail() + } + catch { + case e: Rebalancer.Exception => assertTrue(e.getMessage, e.getMessage.contains("in progress")) + } } }