From 1d3d322ff909c41380c7d827ac5d8826cf43a793 Mon Sep 17 00:00:00 2001 From: Ramki Venkatachalam Date: Fri, 15 May 2015 09:13:23 -0700 Subject: [PATCH] adding stats counter for rebalance --- .../java/com/pinterest/secor/util/StatsUtil.java | 5 +++++ .../com/pinterest/secor/writer/MessageWriter.java | 15 +++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/pinterest/secor/util/StatsUtil.java b/src/main/java/com/pinterest/secor/util/StatsUtil.java index 9cb924dad..79f1920fb 100644 --- a/src/main/java/com/pinterest/secor/util/StatsUtil.java +++ b/src/main/java/com/pinterest/secor/util/StatsUtil.java @@ -35,4 +35,9 @@ public static void clearLabel(String name) { name += "." + threadId; Stats.clearLabel(name); } + + public static void incr(String name) { + Stats.incr(name); + } + } diff --git a/src/main/java/com/pinterest/secor/writer/MessageWriter.java b/src/main/java/com/pinterest/secor/writer/MessageWriter.java index c56c5c7b1..0b46058be 100644 --- a/src/main/java/com/pinterest/secor/writer/MessageWriter.java +++ b/src/main/java/com/pinterest/secor/writer/MessageWriter.java @@ -16,22 +16,24 @@ */ package com.pinterest.secor.writer; -import com.pinterest.secor.common.*; +import com.pinterest.secor.common.FileRegistry; +import com.pinterest.secor.common.LogFilePath; +import com.pinterest.secor.common.OffsetTracker; +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; -import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; import com.pinterest.secor.message.ParsedMessage; - -import java.io.IOException; - import com.pinterest.secor.util.CompressionUtil; import com.pinterest.secor.util.IdUtil; - +import com.pinterest.secor.util.StatsUtil; import org.apache.hadoop.io.compress.CompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * Message writer appends Kafka messages to local log files. * @@ -66,6 +68,7 @@ public void adjustOffset(Message message) throws IOException { message.getKafkaPartition()); long lastSeenOffset = mOffsetTracker.getLastSeenOffset(topicPartition); if (message.getOffset() != lastSeenOffset + 1) { + StatsUtil.incr("secor.consumer_rebalance_count." + topicPartition.getTopic()); // There was a rebalancing event since we read the last message. LOG.debug("offset of message " + message + " does not follow sequentially the last seen offset " + lastSeenOffset +