Skip to content

Commit

Permalink
initial migration to Kafka 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Kirillov committed Jun 13, 2016
1 parent 3d8186b commit d1edf72
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 65 deletions.
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ version = new File('src/scala/ly/stealth/mesos/kafka/Scheduler.scala').readLines

jar.archiveName = "kafka-mesos-${version}.jar"

def mesosVersion="0.25.0"
def kafkaVersion="0.8.1.1"
def kafkaScalaVersion="2.10"
def mesosVersion="0.28.1"
def kafkaVersion="0.10.0.0"
def kafkaScalaVersion="2.11"
def scalaVersion="2.11.6"

repositories {
mavenCentral()
Expand All @@ -34,7 +35,7 @@ sourceSets {

def jettyVersion = "9.0.4.v20130625"
dependencies {
compile "org.scala-lang:scala-library:2.10.6"
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.apache.mesos:mesos:$mesosVersion"
compile name: "util-mesos-0.1.0.0"
compile "com.google.protobuf:protobuf-java:2.5.0"
Expand Down
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#Mon Jun 13 09:11:33 CEST 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip
10 changes: 6 additions & 4 deletions src/scala/ly/stealth/mesos/kafka/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package ly.stealth.mesos.kafka

import java.util

import scala.util.parsing.json.{JSONArray, JSONObject}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import java.util.Collections
import java.io.{FileWriter, File}
import java.io.{File, FileWriter}

import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZKStringSerializer
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import net.elodina.mesos.util.Version

Expand Down Expand Up @@ -136,7 +138,7 @@ object Cluster {
class ZkStorage(val path: String) extends Storage {
createChrootIfRequired()

def zkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer)
def zkClient: ZkClient = ZkUtils.createZkClient(Config.zk, 30000, 30000)

private def createChrootIfRequired(): Unit = {
val slashIdx: Int = Config.zk.indexOf('/')
Expand All @@ -145,7 +147,7 @@ object Cluster {
val chroot = Config.zk.substring(slashIdx)
val zkConnect = Config.zk.substring(0, slashIdx)

val client = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val client = ZkUtils.createZkClient(Config.zk, 30000, 30000)
try { client.createPersistent(chroot, true) }
finally { client.close() }
}
Expand Down
8 changes: 4 additions & 4 deletions src/scala/ly/stealth/mesos/kafka/Expr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ object Expr {
def expandTopics(expr: String): util.List[String] = {
val topics = new util.TreeSet[String]()

val zkClient = newZkClient
val zkUtils = newZkUtils
var allTopics: util.List[String] = null
try { allTopics = ZkUtils.getAllTopics(zkClient) }
finally { zkClient.close() }
try { allTopics = zkUtils.getAllTopics() }
finally { zkUtils.close() }

for (part <- expr.split(",").map(_.trim).filter(!_.isEmpty)) {
if (!part.endsWith("*")) topics.add(part)
Expand All @@ -189,5 +189,5 @@ object Expr {
out.println(" t* - topics starting with 't'")
}

private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer)
private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled
}
6 changes: 3 additions & 3 deletions src/scala/ly/stealth/mesos/kafka/Migration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Migration {

private val values: util.TreeMap[Version, Migration] = new util.TreeMap[Version, Migration]()
private def add(m: Migration): Unit = { values.put(m.version, m); }
add(M_0_9_5_1)
add(M_0_10_0_0)

def all: util.List[Migration] = new util.ArrayList(values.values())

Expand All @@ -31,8 +31,8 @@ object Migration {
result
}

object M_0_9_5_1 extends Migration {
def version: Version = new Version("0.9.5.1")
object M_0_10_0_0 extends Migration {
def version: Version = new Version("0.10.0.0")

def apply(_json: Map[String, Object]): Map[String, Object] = {
val json = new mutable.HashMap[String, Object]() ++ _json
Expand Down
47 changes: 23 additions & 24 deletions src/scala/ly/stealth/mesos/kafka/Rebalancer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import java.util

import scala.Some
import scala.collection.JavaConversions._
import scala.collection.{mutable, Seq, Map}

import scala.collection.{Map, Seq, mutable}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException

import kafka.admin._
import kafka.common.TopicAndPartition
import kafka.utils.{ZkUtils, ZKStringSerializer}
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.utils.{ZKStringSerializer, ZkUtils}
import net.elodina.mesos.util.Period
import org.apache.log4j.Logger

Expand All @@ -38,10 +36,10 @@ class Rebalancer {
@volatile private var assignment: Map[TopicAndPartition, Seq[Int]] = null
@volatile private var reassignment: Map[TopicAndPartition, Seq[Int]] = null

private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer)
private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled

def running: Boolean = {
val zkClient = newZkClient
val zkClient = newZkUtils.zkClient
try { zkClient.exists(ZkUtils.ReassignPartitionsPath) }
finally { zkClient.close() }
}
Expand All @@ -50,26 +48,27 @@ class Rebalancer {
if (topics.isEmpty) throw new Rebalancer.Exception("no topics")

logger.info(s"Starting rebalance for topics ${topics.mkString(",")} on brokers ${brokers.mkString(",")} with ${if (replicas == -1) "<default>" else replicas} replicas")
val zkClient = newZkClient
val zkUtils = newZkUtils
try {
val assignment: Map[TopicAndPartition, Seq[Int]] = ZkUtils.getReplicaAssignmentForTopics(zkClient, topics)
val reassignment: Map[TopicAndPartition, Seq[Int]] = getReassignments(brokers.map(Integer.parseInt), topics, assignment, replicas)
val assignment: Map[TopicAndPartition, Seq[Int]] = zkUtils.getReplicaAssignmentForTopics(topics)
val reassignment: Map[TopicAndPartition, Seq[Int]] = getReassignments(brokers.map(b => BrokerMetadata(b.toInt, None)), topics, assignment, replicas)

reassignPartitions(zkUtils, reassignment)

reassignPartitions(zkClient, reassignment)
this.reassignment = reassignment
this.assignment = assignment
} finally {
zkClient.close()
zkUtils.close()
}
}

def state: String = {
if (assignment == null) return ""
var s = ""

val zkClient = newZkClient
val zkUtils = newZkUtils
try {
val reassigning: Map[TopicAndPartition, Seq[Int]] = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
val reassigning: Map[TopicAndPartition, Seq[Int]] = zkUtils.getPartitionsBeingReassigned.mapValues(_.newReplicas)

val byTopic: Map[String, Map[TopicAndPartition, Seq[Int]]] = assignment.groupBy(tp => tp._1.topic)
for (topic <- byTopic.keys.to[List].sorted) {
Expand All @@ -81,12 +80,12 @@ class Rebalancer {
s += " " + topicAndPartition.partition + ": " + brokers.mkString(",")
s += " -> "
s += reassignment.getOrElse(topicAndPartition, null).mkString(",")
s += " - " + getReassignmentState(zkClient, topicAndPartition, reassigning)
s += " - " + getReassignmentState(zkUtils, topicAndPartition, reassigning)
s += "\n"
}
}
} finally {
zkClient.close()
zkUtils.close()
}

s
Expand All @@ -105,7 +104,7 @@ class Rebalancer {
matches
}

private def getReassignments(brokerIds: Seq[Int], topics: util.List[String], assignment: Map[TopicAndPartition, Seq[Int]], replicas: Int = -1): Map[TopicAndPartition, Seq[Int]] = {
private def getReassignments(brokerMetadata: Seq[BrokerMetadata], topics: util.List[String], assignment: Map[TopicAndPartition, Seq[Int]], replicas: Int = -1): Map[TopicAndPartition, Seq[Int]] = {
var reassignment : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()

val byTopic: Map[String, Map[TopicAndPartition, Seq[Int]]] = assignment.groupBy(tp => tp._1.topic)
Expand All @@ -114,32 +113,32 @@ class Rebalancer {
val rf: Int = if (replicas != -1) replicas else entry._2.valuesIterator.next().size
val partitions: Int = entry._2.size

val assignedReplicas: Map[Int, Seq[Int]] = AdminUtils.assignReplicasToBrokers(brokerIds, partitions, rf, 0, 0)
val assignedReplicas: Map[Int, Seq[Int]] = AdminUtils.assignReplicasToBrokers(brokerMetadata, partitions, rf, 0, 0)
reassignment ++= assignedReplicas.map(replicaEntry => TopicAndPartition(topic, replicaEntry._1) -> replicaEntry._2)
}

reassignment.toMap
}

private def getReassignmentState(zkClient: ZkClient, topicAndPartition: TopicAndPartition, reassigning: Map[TopicAndPartition, Seq[Int]]): String = {
private def getReassignmentState(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition, reassigning: Map[TopicAndPartition, Seq[Int]]): String = {
reassigning.get(topicAndPartition) match {
case Some(partition) => "running"
case None =>
// check if the current replica assignment matches the expected one after reassignment
val assignedBrokers = reassignment(topicAndPartition)
val brokers = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
val brokers = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)

if(brokers.sorted == assignedBrokers.sorted) "done"
else "error"
}
}

private def reassignPartitions(zkClient: ZkClient, partitions: Map[TopicAndPartition, Seq[Int]]): Unit = {
private def reassignPartitions(zkUtils: ZkUtils, partitions: Map[TopicAndPartition, Seq[Int]]): Unit = {
try {
val json = ZkUtils.getPartitionReassignmentZkData(partitions)
ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, json)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitions)
reassignPartitionsCommand.reassignPartitions()
} catch {
case ze: ZkNodeExistsException => throw new Rebalancer.Exception("rebalance is in progress")
case ze: AdminCommandFailedException => throw new Rebalancer.Exception(ze.getMessage)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/scala/ly/stealth/mesos/kafka/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.mutable

object Scheduler extends org.apache.mesos.Scheduler {
private val logger: Logger = Logger.getLogger(this.getClass)
val version: Version = new Version("0.9.5.1")
val version: Version = new Version("0.10.0.0")

val cluster: Cluster = new Cluster()
private var driver: SchedulerDriver = null
Expand Down Expand Up @@ -419,7 +419,7 @@ object Scheduler extends org.apache.mesos.Scheduler {

credsBuilder = Credential.newBuilder()
credsBuilder.setPrincipal(Config.principal)
credsBuilder.setSecret(ByteString.copyFromUtf8(Config.secret))
credsBuilder.setSecret(Config.secret)
}

val driver =
Expand Down
32 changes: 16 additions & 16 deletions src/scala/ly/stealth/mesos/kafka/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import ly.stealth.mesos.kafka.Topics.Topic
import kafka.log.LogConfig

class Topics {
private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer)
private def newZkUtils: ZkUtils = ZkUtils(Config.zk, 30000, 30000, isZkSecurityEnabled = false)//todo: parametrize isZkSecurityEnabled

def getTopic(name: String): Topics.Topic = {
if (name == null) return null
Expand All @@ -41,13 +41,13 @@ class Topics {
}

def getTopics: util.List[Topics.Topic] = {
val zkClient = newZkClient
val zkUtils = newZkUtils

try {
var names = ZkUtils.getAllTopics(zkClient)
val names = zkUtils.getAllTopics

val assignments: mutable.Map[String, Map[Int, Seq[Int]]] = ZkUtils.getPartitionAssignmentForTopics(zkClient, names)
val configs = AdminUtils.fetchAllTopicConfigs(zkClient)
val assignments: mutable.Map[String, Map[Int, Seq[Int]]] = zkUtils.getPartitionAssignmentForTopics(names)
val configs = AdminUtils.fetchAllTopicConfigs(zkUtils)

val topics = new util.ArrayList[Topics.Topic]
for (name <- names.sorted)
Expand All @@ -59,20 +59,20 @@ class Topics {

topics
} finally {
zkClient.close()
zkUtils.close()
}
}

def fairAssignment(partitions: Int = 1, replicas: Int = 1, brokers: util.List[Int] = null): util.Map[Int, util.List[Int]] = {
var brokers_ = brokers

if (brokers_ == null) {
val zkClient = newZkClient
try { brokers_ = ZkUtils.getSortedBrokerList(zkClient)}
finally { zkClient.close() }
val zkUtils = newZkUtils
try { brokers_ = zkUtils.getSortedBrokerList()}
finally { zkUtils.close() }
}

AdminUtils.assignReplicasToBrokers(brokers_, partitions, replicas, 0, 0).mapValues(new util.ArrayList[Int](_))
AdminUtils.assignReplicasToBrokers(brokers_.map(b => BrokerMetadata(b, None)), partitions, replicas, 0, 0).mapValues(new util.ArrayList[Int](_))
}

def addTopic(name: String, assignment: util.Map[Int, util.List[Int]] = null, options: util.Map[String, String] = null): Topic = {
Expand All @@ -83,9 +83,9 @@ class Topics {
if (options != null)
for ((k, v) <- options) config.setProperty(k, v)

val zkClient = newZkClient
try { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, name, assignment_.mapValues(_.toList), config) }
finally { zkClient.close() }
val zkUtils = newZkUtils
try { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, name, assignment_.mapValues(_.toList), config) }
finally { zkUtils.close() }

getTopic(name)
}
Expand All @@ -94,9 +94,9 @@ class Topics {
val config: Properties = new Properties()
for ((k, v) <- options) config.setProperty(k, v)

val zkClient = newZkClient
try { AdminUtils.changeTopicConfig(zkClient, topic.name, config) }
finally { zkClient.close() }
val zkUtils = newZkUtils
try { AdminUtils.changeTopicConfig(zkUtils, topic.name, config) }
finally { zkUtils.close() }
}

def validateOptions(options: util.Map[String, String]): String = {
Expand Down
25 changes: 18 additions & 7 deletions src/test/ly/stealth/mesos/kafka/RebalancerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package ly.stealth.mesos.kafka

import org.junit.{Test, After, Before}
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, ZkUtils}

import scala.collection.JavaConversions._
import java.util

import kafka.common.TopicAndPartition
import org.apache.log4j.Logger

class RebalancerTest extends KafkaMesosTestCase {
var rebalancer: Rebalancer = null
var zkClient: ZkClient = null
var zkUtils: ZkUtils = null

@Before
override def before {
Expand All @@ -37,8 +41,7 @@ class RebalancerTest extends KafkaMesosTestCase {
Config.zk = s"localhost:$port"

startZkServer()
zkClient = zkServer.getZkClient
zkClient.setZkSerializer(ZKStringSerializer)
zkUtils = ZkUtils(Config.zk, 3000, 3000, isZkSecurityEnabled = false)
}

@After
Expand All @@ -64,9 +67,17 @@ class RebalancerTest extends KafkaMesosTestCase {
@Test
def start_in_progress {
Scheduler.cluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0)))
ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, "")

try { rebalancer.start(util.Arrays.asList("t1"), util.Arrays.asList("0", "1")); fail() }
catch { case e: Rebalancer.Exception => assertTrue(e.getMessage, e.getMessage.contains("in progress")) }
val partitionsReassignmentData = Map(TopicAndPartition("topic", 0) -> Seq(0,1))
val jsonReassignmentData = zkUtils.formatAsReassignmentJson(partitionsReassignmentData)
zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)

try {
rebalancer.start(util.Arrays.asList("topic"), util.Arrays.asList("0", "1"))
fail()
}
catch {
case e: Rebalancer.Exception => assertTrue(e.getMessage, e.getMessage.contains("in progress"))
}
}
}

0 comments on commit d1edf72

Please sign in to comment.