Skip to content

Commit

Permalink
Introduced new property 'secor.max.file.age.policy' to either check f…
Browse files Browse the repository at this point in the history
…or the oldest file or the newest file in the topic partition to compute file max age seconds
  • Loading branch information
SanthoshVasabhaktula committed Jul 29, 2015
1 parent cc8b2b3 commit 47911e0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/main/java/com/pinterest/secor/common/FileRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.pinterest.secor.util.ReflectionUtil;
import com.pinterest.secor.util.StatsUtil;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,6 +208,7 @@ public long getSize(TopicPartition topicPartition) throws IOException {
public long getModificationAgeSec(TopicPartition topicPartition) throws IOException {
long now = System.currentTimeMillis() / 1000L;
long result = Long.MAX_VALUE;
boolean useOldestFile = StringUtils.equals("oldest", mConfig.getMaxFileAgePolicy());
Collection<LogFilePath> paths = getPaths(topicPartition);
for (LogFilePath path : paths) {
Long creationTime = mCreationTimes.get(path);
Expand All @@ -215,7 +217,11 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept
creationTime = now;
}
long age = now - creationTime;
if (age < result) {
if(result == Long.MAX_VALUE) {
result = age;
} else if (!useOldestFile && age < result) {
result = age;
} else if (useOldestFile && age > result) {
result = age;
}
}
Expand All @@ -226,4 +232,5 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept
topicPartition.getPartition(), Long.toString(result));
return result;
}

}
4 changes: 4 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public long getMaxFileSizeBytes() {
public long getMaxFileAgeSeconds() {
return getLong("secor.max.file.age.seconds");
}

public String getMaxFileAgePolicy() {
return getString("secor.max.file.age.policy");
}

public long getOffsetsPerPartition() {
return getLong("secor.offsets.per.partition");
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ private void trimFiles(TopicPartition topicPartition, long startOffset) throws E
private void checkTopicPartition(TopicPartition topicPartition) throws Exception {
final long size = mFileRegistry.getSize(topicPartition);
final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition);
LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
if (size >= mConfig.getMaxFileSizeBytes() ||
modificationAgeSec >= mConfig.getMaxFileAgeSeconds()) {
long maxFileAgeSeconds = mConfig.getMaxFileAgeSeconds();
LOG.debug("size: " + size + " | maxFileSizeBytes: " + mConfig.getMaxFileSizeBytes() + " | modificationAge: " + modificationAgeSec + " | maxFileAgeSeconds: " + maxFileAgeSeconds);
if (size >= mConfig.getMaxFileSizeBytes() || modificationAgeSec >= maxFileAgeSeconds) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
newOffsetCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void setUp() throws Exception {
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.addProperty("secor.file.reader.writer.factory",
"com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory");
properties.addProperty("secor.max.file.age.policy", "");
SecorConfig secorConfig = new SecorConfig(properties);
mRegistry = new FileRegistry(secorConfig);
mLogFilePath = new LogFilePath("/some_parent_dir", PATH);
Expand Down

0 comments on commit 47911e0

Please sign in to comment.