Skip to content

Commit

Permalink
Merge branch 'master' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
SanthoshVasabhaktula committed Jul 29, 2015
2 parents 6fb6015 + 47911e0 commit 9e7424b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
8 changes: 4 additions & 4 deletions src/main/config/secor.dev.partition.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ include=secor.dev.properties
secor.kafka.group=secor_partition

# Parser class that extracts s3 partitions from consumed messages.
secor.message.parser.class=com.pinterest.secor.parser.PatternDateMessageParser
secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser

# S3 path where sequence files are stored.
secor.s3.path=secor_dev
secor.s3.path=secor_dev/partition

# Local path where sequence files are stored before they are uploaded to s3.
secor.local.path=/tmp/secor_dev/message_logs/partition

# Port of the Ostrich server.
ostrich.port=9998

# Partition Date Output format. This is used along with PatternDateMessageParser.
secor.partition.output_dt_format=dd_MM_yyyy
# Partition Date Output format. This is used along with PatternDateMessageParser. Defaults to 'yyyy-MM-dd' *New*
secor.partition.output_dt_format=
12 changes: 7 additions & 5 deletions src/main/config/secor.dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ include=secor.common.properties
############

# Name of the s3 bucket where log files are stored.
secor.s3.bucket=lpdev-ekstep
secor.s3.bucket=

################
# END MUST SET #
Expand All @@ -17,10 +17,12 @@ kafka.seed.broker.port=9092
zookeeper.quorum=localhost:2181

# Upload policies.
# 100M
secor.max.file.size.bytes=1000000000
# 1 minute
secor.max.file.age.seconds=60
# 10K
secor.max.file.size.bytes=10000
# 10 seconds
secor.max.file.age.seconds=10
# Compute the max file age for a topic partition using either the oldest or newest file. Defaults to newest
secor.max.file.age.policy=

# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz.
# Available placeholders are
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/pinterest/secor/common/FileRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.pinterest.secor.util.ReflectionUtil;
import com.pinterest.secor.util.StatsUtil;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,6 +208,7 @@ public long getSize(TopicPartition topicPartition) throws IOException {
public long getModificationAgeSec(TopicPartition topicPartition) throws IOException {
long now = System.currentTimeMillis() / 1000L;
long result = Long.MAX_VALUE;
boolean useOldestFile = StringUtils.equals("oldest", mConfig.getMaxFileAgePolicy());
Collection<LogFilePath> paths = getPaths(topicPartition);
for (LogFilePath path : paths) {
Long creationTime = mCreationTimes.get(path);
Expand All @@ -215,7 +217,11 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept
creationTime = now;
}
long age = now - creationTime;
if (age < result) {
if(result == Long.MAX_VALUE) {
result = age;
} else if (!useOldestFile && age < result) {
result = age;
} else if (useOldestFile && age > result) {
result = age;
}
}
Expand All @@ -226,4 +232,5 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept
topicPartition.getPartition(), Long.toString(result));
return result;
}

}
4 changes: 4 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public long getMaxFileSizeBytes() {
public long getMaxFileAgeSeconds() {
return getLong("secor.max.file.age.seconds");
}

public String getMaxFileAgePolicy() {
return getString("secor.max.file.age.policy");
}

public long getOffsetsPerPartition() {
return getLong("secor.offsets.per.partition");
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ private void trimFiles(TopicPartition topicPartition, long startOffset) throws E
private void checkTopicPartition(TopicPartition topicPartition) throws Exception {
final long size = mFileRegistry.getSize(topicPartition);
final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition);
LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
if (size >= mConfig.getMaxFileSizeBytes() ||
modificationAgeSec >= mConfig.getMaxFileAgeSeconds()) {
long maxFileAgeSeconds = mConfig.getMaxFileAgeSeconds();
LOG.debug("size: " + size + " | maxFileSizeBytes: " + mConfig.getMaxFileSizeBytes() + " | modificationAge: " + modificationAgeSec + " | maxFileAgeSeconds: " + maxFileAgeSeconds);
if (size >= mConfig.getMaxFileSizeBytes() || modificationAgeSec >= maxFileAgeSeconds) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
newOffsetCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void setUp() throws Exception {
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.addProperty("secor.file.reader.writer.factory",
"com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory");
properties.addProperty("secor.max.file.age.policy", "");
SecorConfig secorConfig = new SecorConfig(properties);
mRegistry = new FileRegistry(secorConfig);
mLogFilePath = new LogFilePath("/some_parent_dir", PATH);
Expand Down

0 comments on commit 9e7424b

Please sign in to comment.