From 47911e0bd4df6ebddd2a919a4b833569aa4ea6d2 Mon Sep 17 00:00:00 2001 From: SanthoshVasabhaktula Date: Wed, 29 Jul 2015 23:59:26 +0530 Subject: [PATCH] Introduced new property 'secor.max.file.age.policy' to either check for the oldest file or the newest file in the topic partition to compute file max age seconds --- .../java/com/pinterest/secor/common/FileRegistry.java | 9 ++++++++- .../java/com/pinterest/secor/common/SecorConfig.java | 4 ++++ src/main/java/com/pinterest/secor/uploader/Uploader.java | 6 +++--- .../com/pinterest/secor/common/FileRegistryTest.java | 1 + 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/FileRegistry.java b/src/main/java/com/pinterest/secor/common/FileRegistry.java index 98b2520a2..0350481d6 100644 --- a/src/main/java/com/pinterest/secor/common/FileRegistry.java +++ b/src/main/java/com/pinterest/secor/common/FileRegistry.java @@ -21,6 +21,7 @@ import com.pinterest.secor.util.ReflectionUtil; import com.pinterest.secor.util.StatsUtil; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,6 +208,7 @@ public long getSize(TopicPartition topicPartition) throws IOException { public long getModificationAgeSec(TopicPartition topicPartition) throws IOException { long now = System.currentTimeMillis() / 1000L; long result = Long.MAX_VALUE; + boolean useOldestFile = StringUtils.equals("oldest", mConfig.getMaxFileAgePolicy()); Collection paths = getPaths(topicPartition); for (LogFilePath path : paths) { Long creationTime = mCreationTimes.get(path); @@ -215,7 +217,11 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept creationTime = now; } long age = now - creationTime; - if (age < result) { + if(result == Long.MAX_VALUE) { + result = age; + } else if (!useOldestFile && age < result) { + result = age; + } else if (useOldestFile && age > result) { result = age; } } @@ -226,4 +232,5 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept topicPartition.getPartition(), Long.toString(result)); return result; } + } diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index aa81e77b7..acb6d9c1f 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -126,6 +126,10 @@ public long getMaxFileSizeBytes() { public long getMaxFileAgeSeconds() { return getLong("secor.max.file.age.seconds"); } + + public String getMaxFileAgePolicy() { + return getString("secor.max.file.age.policy"); + } public long getOffsetsPerPartition() { return getLong("secor.offsets.per.partition"); diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 2248238e2..1ad687f8b 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -200,9 +200,9 @@ private void trimFiles(TopicPartition topicPartition, long startOffset) throws E private void checkTopicPartition(TopicPartition topicPartition) throws Exception { final long size = mFileRegistry.getSize(topicPartition); final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition); - LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec); - if (size >= mConfig.getMaxFileSizeBytes() || - modificationAgeSec >= mConfig.getMaxFileAgeSeconds()) { + long maxFileAgeSeconds = mConfig.getMaxFileAgeSeconds(); + LOG.debug("size: " + size + " | maxFileSizeBytes: " + mConfig.getMaxFileSizeBytes() + " | modificationAge: " + modificationAgeSec + " | maxFileAgeSeconds: " + maxFileAgeSeconds); + if (size >= mConfig.getMaxFileSizeBytes() || modificationAgeSec >= maxFileAgeSeconds) { long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition); long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition, newOffsetCount); diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java index 021437cca..7cfddcf2b 100644 --- a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java +++ b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java @@ -57,6 +57,7 @@ public void setUp() throws Exception { PropertiesConfiguration properties = new PropertiesConfiguration(); properties.addProperty("secor.file.reader.writer.factory", "com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory"); + properties.addProperty("secor.max.file.age.policy", ""); SecorConfig secorConfig = new SecorConfig(properties); mRegistry = new FileRegistry(secorConfig); mLogFilePath = new LogFilePath("/some_parent_dir", PATH);