Skip to content

Commit

Permalink
Merge pull request pinterest#100 from yyejun/kafka_upgrade
Browse files Browse the repository at this point in the history
Kafka upgrade and use roundrobin assignment
  • Loading branch information
pgarbacki committed Jul 15, 2015
2 parents 9764ff6 + 42ab0a6 commit d7679c6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private ConsumerConfig createConsumerConfig() throws UnknownHostException {
props.put("auto.offset.reset", "smallest");
props.put("consumer.timeout.ms", Integer.toString(mConfig.getConsumerTimeoutMs()));
props.put("consumer.id", IdUtil.getConsumerId());
props.put("partition.assignment.strategy", "roundrobin");
if (mConfig.getRebalanceMaxRetries() != null &&
!mConfig.getRebalanceMaxRetries().isEmpty()) {
props.put("rebalance.max.retries", mConfig.getRebalanceMaxRetries());
Expand Down
25 changes: 11 additions & 14 deletions src/main/scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ check_for_native_libs() {

recreate_dirs() {
run_command "rm -r -f ${PARENT_DIR}"
if [ -n ${SECOR_LOCAL_S3} ]; then
if [ -n "${SECOR_LOCAL_S3}" ]; then
run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del"
else
run_command "s3cmd del --recursive ${S3_LOGS_DIR}"
Expand All @@ -85,7 +85,7 @@ recreate_dirs() {
}

start_s3() {
if [ -n ${SECOR_LOCAL_S3} ]; then
if [ -n "${SECOR_LOCAL_S3}" ]; then
if command -v fakes3 > /dev/null 2>&1; then
run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &"
sleep 2
Expand All @@ -97,7 +97,7 @@ start_s3() {
}

stop_s3() {
if [ -n ${SECOR_LOCAL_S3} ]; then
if [ -n "${SECOR_LOCAL_S3}" ]; then
run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true"
run_command "rm -r -f /tmp/fakes3"
fi
Expand Down Expand Up @@ -175,17 +175,14 @@ verify() {
set_offsets_in_zookeeper() {
for group in secor_backup secor_partition; do
for partition in 0 1; do
run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \
/consumers \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1"
run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \
/consumers/${group} \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1"
run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \
/consumers/${group}/offsets \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1"
run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \
/consumers/${group}/offsets/test \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1"
run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \
/consumers/${group}/offsets/test/${partition} $1 > \
${LOGS_DIR}/run_zookeeper_command.log 2>&1"
cat <<EOF | run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 > ${LOGS_DIR}/run_zookeeper_command.log 2>&1"
create /consumers ''
create /consumers/${group} ''
create /consumers/${group}/offsets ''
create /consumers/${group}/offsets/test ''
create /consumers/${group}/offsets/test/${partition} $1
quit
EOF
done
done
}
Expand Down
5 changes: 0 additions & 5 deletions src/main/scripts/run_zookeeper_command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

# Author: Pawel Garbacki ([email protected])

if [ $# -lt 3 ]; then
echo "USAGE: $0 zookeeper_host:port cmd args"
exit 1
fi

CURR_DIR=`dirname $0`
source ${CURR_DIR}/run_common.sh

Expand Down

0 comments on commit d7679c6

Please sign in to comment.