Skip to content

Commit

Permalink
adding stats counter for rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramki Venkatachalam committed May 15, 2015
1 parent ce44dd8 commit 1d3d322
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/main/java/com/pinterest/secor/util/StatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public static void clearLabel(String name) {
name += "." + threadId;
Stats.clearLabel(name);
}

public static void incr(String name) {
Stats.incr(name);
}

}
15 changes: 9 additions & 6 deletions src/main/java/com/pinterest/secor/writer/MessageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@
*/
package com.pinterest.secor.writer;

import com.pinterest.secor.common.*;
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;

import java.io.IOException;

import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.IdUtil;

import com.pinterest.secor.util.StatsUtil;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Message writer appends Kafka messages to local log files.
*
Expand Down Expand Up @@ -66,6 +68,7 @@ public void adjustOffset(Message message) throws IOException {
message.getKafkaPartition());
long lastSeenOffset = mOffsetTracker.getLastSeenOffset(topicPartition);
if (message.getOffset() != lastSeenOffset + 1) {
StatsUtil.incr("secor.consumer_rebalance_count." + topicPartition.getTopic());
// There was a rebalancing event since we read the last message.
LOG.debug("offset of message " + message +
" does not follow sequentially the last seen offset " + lastSeenOffset +
Expand Down

0 comments on commit 1d3d322

Please sign in to comment.