From 5c31a4662521d17f1959a9878a99c014b32f9683 Mon Sep 17 00:00:00 2001 From: Christian Nguyen Van Than Date: Wed, 15 Jul 2015 15:35:30 +0200 Subject: [PATCH 1/2] fix bad logging message --- src/main/java/com/pinterest/secor/common/OffsetTracker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/OffsetTracker.java b/src/main/java/com/pinterest/secor/common/OffsetTracker.java index 8fff6eb70..4a390ca84 100644 --- a/src/main/java/com/pinterest/secor/common/OffsetTracker.java +++ b/src/main/java/com/pinterest/secor/common/OffsetTracker.java @@ -52,10 +52,10 @@ public long setLastSeenOffset(TopicPartition topicPartition, long offset) { mLastSeenOffset.put(topicPartition, offset); if (lastSeenOffset + 1 != offset) { if (lastSeenOffset >= 0) { - LOG.warn("offset for topic {} partition {} changed from {} to {}", + LOG.warn("offset for topic {} partition {} changed from {} to {}", topicPartition.getTopic(),topicPartition.getPartition(),lastSeenOffset, offset); } else { - LOG.info("starting to consume topic {} partition from offset {}", + LOG.info("starting to consume topic {} partition {} from offset {}", topicPartition.getTopic(),topicPartition.getPartition(),offset); } } From 42ab0a61ea1a4353b614cb2ff37638fc1893d658 Mon Sep 17 00:00:00 2001 From: Yejun Yang Date: Tue, 12 May 2015 11:41:09 -0700 Subject: [PATCH 2/2] update kafka version fix test for newer kafka fix test script use roundbin assignment --- pom.xml | 2 +- .../pinterest/secor/reader/MessageReader.java | 1 + src/main/scripts/run_tests.sh | 25 ++++++++----------- src/main/scripts/run_zookeeper_command.sh | 5 ---- 4 files changed, 13 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 069085186..963f7a091 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ org.apache.kafka kafka_2.10 - 0.8.1.1 + 0.8.2.1 org.slf4j diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java index 245320346..11ff76214 100644 --- a/src/main/java/com/pinterest/secor/reader/MessageReader.java +++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java @@ -110,6 +110,7 @@ private ConsumerConfig createConsumerConfig() throws UnknownHostException { props.put("auto.offset.reset", "smallest"); props.put("consumer.timeout.ms", Integer.toString(mConfig.getConsumerTimeoutMs())); props.put("consumer.id", IdUtil.getConsumerId()); + props.put("partition.assignment.strategy", "roundrobin"); if (mConfig.getRebalanceMaxRetries() != null && !mConfig.getRebalanceMaxRetries().isEmpty()) { props.put("rebalance.max.retries", mConfig.getRebalanceMaxRetries()); diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 6a021ee32..8034fb088 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -73,7 +73,7 @@ check_for_native_libs() { recreate_dirs() { run_command "rm -r -f ${PARENT_DIR}" - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" else run_command "s3cmd del --recursive ${S3_LOGS_DIR}" @@ -85,7 +85,7 @@ recreate_dirs() { } start_s3() { - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then if command -v fakes3 > /dev/null 2>&1; then run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &" sleep 2 @@ -97,7 +97,7 @@ start_s3() { } stop_s3() { - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true" run_command "rm -r -f /tmp/fakes3" fi @@ -175,17 +175,14 @@ verify() { set_offsets_in_zookeeper() { for group in secor_backup secor_partition; do for partition in 0 1; do - run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \ - /consumers \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1" - run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \ - /consumers/${group} \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1" - run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \ - /consumers/${group}/offsets \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1" - run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \ - /consumers/${group}/offsets/test \'\' > ${LOGS_DIR}/run_zookeeper_command.log 2>&1" - run_command "${BASE_DIR}/run_zookeeper_command.sh localhost:2181 create \ - /consumers/${group}/offsets/test/${partition} $1 > \ - ${LOGS_DIR}/run_zookeeper_command.log 2>&1" + cat < ${LOGS_DIR}/run_zookeeper_command.log 2>&1" +create /consumers '' +create /consumers/${group} '' +create /consumers/${group}/offsets '' +create /consumers/${group}/offsets/test '' +create /consumers/${group}/offsets/test/${partition} $1 +quit +EOF done done } diff --git a/src/main/scripts/run_zookeeper_command.sh b/src/main/scripts/run_zookeeper_command.sh index 178b92ada..873737b95 100755 --- a/src/main/scripts/run_zookeeper_command.sh +++ b/src/main/scripts/run_zookeeper_command.sh @@ -17,11 +17,6 @@ # Author: Pawel Garbacki (pawel@pinterest.com) -if [ $# -lt 3 ]; then - echo "USAGE: $0 zookeeper_host:port cmd args" - exit 1 -fi - CURR_DIR=`dirname $0` source ${CURR_DIR}/run_common.sh