Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to kafka 0.10.2.1 #7

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
FROM wurstmeister/kafka:0.10.0.1
FROM wurstmeister/kafka:0.10.2.1

MAINTAINER CloudTrackInc
MAINTAINER dawidCh

RUN java -version

ADD . /tmp/build
RUN cd /tmp/build && \
./gradlew -Dorg.gradle.native=false build && \
cp build/libs/kubernetes-expander-1.0-SNAPSHOT.jar $KAFKA_HOME/libs/
cp build/libs/kubernetes-expander-2.0-SNAPSHOT.jar $KAFKA_HOME/libs/

ADD kafka-autoextend-partitions.sh /usr/bin/kafka-autoextend-partitions.sh
ADD kafka-kubernetes-start.sh /usr/bin/kafka-kubernetes-start.sh
RUN echo delete.topic.enable=true >> /opt/kafka/config/server.properties
RUN listeners=PLAINTEXTX://0.0.0.0:9092 >> /opt/kafka/config/server.properties

CMD ["kafka-kubernetes-start.sh"]
12 changes: 6 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'org.cloudtrack.kubetrack'
version '1.0-SNAPSHOT'
version '2.0-SNAPSHOT'

apply plugin: 'java'

Expand All @@ -8,11 +8,11 @@ repositories {
}

dependencies {
compile 'org.scala-lang:scala-library:2.11.7'
compile 'com.101tec:zkclient:0.7'
compile 'org.apache.kafka:kafka_2.11:0.10.0.1'
compile 'org.scala-lang:scala-parser-combinators:2.11.0-M4'
testCompile group: 'junit', name: 'junit', version: '4.11'
compile 'org.scala-lang:scala-library:2.12.2'
compile 'com.101tec:zkclient:0.10'
compile 'org.apache.kafka:kafka_2.12:0.10.2.1'
compile 'org.scala-lang.modules:scala-parser-combinators_2.12:1.0.4'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

allprojects {
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Fri Mar 11 14:44:25 MSK 2016
#Tue Jul 04 11:45:52 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.11-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.11-all.zip
4 changes: 2 additions & 2 deletions kafka-rc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
spec:
containers:
- name: kafka
image: cloudtrackinc/kubernetes-kafka:latest
image: dawidch/kubernetes-kafka:0.10.2.1
ports:
- containerPort: 9092
env:
Expand All @@ -36,4 +36,4 @@ spec:
- name: KAFKA_ZOOKEEPER_CONNECT
value: kafka-zoo-svc:2181
- name: KAFKA_CREATE_TOPICS
value: demo-topic:16:1
value: demo-topic:16:1
20 changes: 8 additions & 12 deletions src/main/scala/kafka/admin/AutoExpand.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package kafka.admin

import java.util.concurrent.{CountDownLatch, LinkedBlockingDeque, TimeUnit}
import java.util.{Collections, List => JList}

import joptsimple.OptionParser
import kafka.cluster.BrokerEndPoint
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.utils._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.CreateMode

import collection._
import _root_.scala.collection.JavaConversions._
import java.util.{Collections, List => JList}

import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
import org.apache.zookeeper.CreateMode
import scala.collection._


object AutoExpandCommand {
Expand Down Expand Up @@ -48,7 +45,6 @@ object AutoExpandCommand {
val mNode = zkUtils.zkClient.createEphemeralSequential(KAFKA_POD_MASTER+"/pod-", uid)
val index = sequnce(mNode)
case class Broker(index: Int, id: String) extends Ordered[Broker] {
import scala.math.Ordered.orderingToOrdered

def compare(that: Broker): Int = this.index.compareTo(that.index)
}
Expand Down Expand Up @@ -184,8 +180,8 @@ object AutoExpandCommand {
}

var (partitionsToBeReassigned, currentAssignments) = generateAssignment(zkUtils, newBrokersIds, topics, true)
println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned)))
println("Current partition replica assignment\n\n%s".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.formatAsReassignmentJson(partitionsToBeReassigned)))
val exec = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned)
if (exec.reassignPartitions()){
var inProgress = true
Expand Down Expand Up @@ -215,7 +211,7 @@ object AutoExpandCommand {
attemt+=1
}
if (inProgress){
println("Timeout reassignment partitions:\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
println("Timeout reassignment partitions:\n\n%s".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
} else{
println("Reassignment compleated!!!")
}
Expand Down