diff --git a/src/main/config/secor.dev.partition.properties b/src/main/config/secor.dev.partition.properties index d029c86d0..24af2c9f1 100644 --- a/src/main/config/secor.dev.partition.properties +++ b/src/main/config/secor.dev.partition.properties @@ -19,10 +19,10 @@ include=secor.dev.properties secor.kafka.group=secor_partition # Parser class that extracts s3 partitions from consumed messages. -secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser +secor.message.parser.class=com.pinterest.secor.parser.PatternDateMessageParser # S3 path where sequence files are stored. -secor.s3.path=secor_dev/partition +secor.s3.path=secor_dev # Local path where sequence files are stored before they are uploaded to s3. secor.local.path=/tmp/secor_dev/message_logs/partition @@ -30,3 +30,5 @@ secor.local.path=/tmp/secor_dev/message_logs/partition # Port of the Ostrich server. ostrich.port=9998 +# Partition Date Output format. This is used along with PatternDateMessageParser. +secor.partition.output_dt_format=dd_MM_yyyy \ No newline at end of file diff --git a/src/main/config/secor.dev.properties b/src/main/config/secor.dev.properties index ec30afa71..a53fea7b9 100644 --- a/src/main/config/secor.dev.properties +++ b/src/main/config/secor.dev.properties @@ -5,7 +5,7 @@ include=secor.common.properties ############ # Name of the s3 bucket where log files are stored. -secor.s3.bucket= +secor.s3.bucket=lpdev-ekstep ################ # END MUST SET # @@ -17,8 +17,17 @@ kafka.seed.broker.port=9092 zookeeper.quorum=localhost:2181 # Upload policies. -# 10K -secor.max.file.size.bytes=10000 -# 10 seconds -secor.max.file.age.seconds=10 +# 100M +secor.max.file.size.bytes=1000000000 +# 1 minute +secor.max.file.age.seconds=60 +# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz. +# Available placeholders are +# topic - The topic name the data is being fetched +# partition - The partition name +# generation - Generation +# kafkaPartition - The kafka partition +# fmOffset - First Message offset in the file. +# randomHex - A 4 character random hex to append to the file name +secor.s3.output_file_pattern={randomHex}_{partition}_{topic}_{generation}.json \ No newline at end of file diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 753288c0d..da40b3995 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -17,10 +17,16 @@ package com.pinterest.secor.common; import com.pinterest.secor.message.ParsedMessage; + import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrSubstitutor; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; /** * LogFilePath represents path of a log file. It contains convenience method for building and @@ -49,6 +55,7 @@ public class LogFilePath { private final int mKafkaPartition; private final long mOffset; private final String mExtension; + private String mOutputFilePattern; public LogFilePath(String prefix, int generation, long lastCommittedOffset, ParsedMessage message, String extension) { @@ -71,6 +78,19 @@ public LogFilePath(String prefix, String topic, String[] partitions, int generat mOffset = offset; mExtension = extension; } + + public LogFilePath(String prefix, String topic, String[] partitions, int generation, + int kafkaPartition, long offset, String extension, SecorConfig config) { + + mPrefix = prefix; + mTopic = topic; + mPartitions = partitions; + mGeneration = generation; + mKafkaPartition = kafkaPartition; + mOffset = offset; + mExtension = extension; + mOutputFilePattern = config.getS3OutputFilePattern(); + } private static String[] subArray(String[] array, int startIndex, int endIndex) { String[] result = new String[endIndex - startIndex + 1]; @@ -139,6 +159,11 @@ private String getLogFileBasename() { } public String getLogFilePath() { + + if (StringUtils.isNotBlank(mOutputFilePattern)) { + return getLogFilePath(mOutputFilePattern); + } + String basename = getLogFileBasename(); ArrayList pathElements = new ArrayList(); @@ -147,6 +172,27 @@ public String getLogFilePath() { return StringUtils.join(pathElements, "/") + mExtension; } + + private String getLogFilePath(String pattern) { + + List pathElements = new ArrayList(); + pathElements.add(mPrefix); + pathElements.add(StrSubstitutor.replace(pattern, getValueMap(), "{", "}")); + System.out.println("Path:" + StringUtils.join(pathElements, "/") + mExtension); + return StringUtils.join(pathElements, "/") + mExtension; + } + + private Map getValueMap() { + + Map valueMap = new HashMap(); + valueMap.put("randomHex", getRandomHex()); + valueMap.put("partition", mPartitions[0]); + valueMap.put("topic", mTopic); + valueMap.put("generation", mGeneration + ""); + valueMap.put("kafkaPartition", mKafkaPartition + ""); + valueMap.put("fmOffset", String.format("%020d", mOffset)); + return valueMap; + } public String getLogFileCrcPath() { String basename = "." + getLogFileBasename() + ".crc"; @@ -214,4 +260,10 @@ public int hashCode() { public String toString() { return getLogFilePath(); } + + public static String getRandomHex() { + + Random random = new Random(); + return StringUtils.substring(Integer.toHexString(random.nextInt()), 0, 4); + } } diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 23621750c..aa81e77b7 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -238,6 +238,14 @@ public String getPerfTestTopicPrefix() { public String getZookeeperPath() { return getString("secor.zookeeper.path"); } + + public String getS3OutputFilePattern() { + return getString("secor.s3.output_file_pattern"); + } + + public String getPartitionOutputDtFormat() { + return getString("secor.partition.output_dt_format"); + } public boolean getBoolean(String name, boolean defaultValue) { return mProperties.getBoolean(name, defaultValue); diff --git a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java new file mode 100644 index 000000000..ae31fc8fe --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.secor.parser; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import net.minidev.json.JSONObject; +import net.minidev.json.JSONValue; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.message.Message; + +/** + * PatternDateMessageParser extracts timestamp field (specified by 'message.timestamp.name') and the date pattern (specified by 'message.timestamp.input.pattern'). + * The output file pattern is fetched from the property 'secor.partition.output_dt_format'. + * + * This generic class can even handle the DateMessageParse functionality. For ex: it will generate the same partition when the 'secor.partition.output_dt_format' property is set to "'dt='yyyy-MM-dd" + * + * @see http://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html + * + * @author Santhosh Vasabhaktula (santhosh.vasabhaktula@gmail.com) + * + */ +public class PatternDateMessageParser extends MessageParser { + private static final Logger LOG = LoggerFactory.getLogger(PatternDateMessageParser.class); + protected static final String defaultDate = "1970-01-01"; + protected static final String defaultFormatter = "yyyy-MM-dd"; + + public PatternDateMessageParser(SecorConfig config) { + super(config); + } + + @Override + public String[] extractPartitions(Message message) { + JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); + String result[] = { defaultDate }; + + if (jsonObject != null) { + Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); + Object inputPattern = mConfig.getMessageTimestampInputPattern(); + if (fieldValue != null && inputPattern != null) { + try { + SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString()); + SimpleDateFormat outputFormatter = new SimpleDateFormat(StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter)); + Date dateFormat = inputFormatter.parse(fieldValue.toString()); + result[0] = outputFormatter.format(dateFormat); + return result; + } catch (Exception e) { + LOG.warn("Impossible to convert date = " + fieldValue.toString() + + " for the input pattern = " + inputPattern.toString() + + ". Using date default=" + result[0]); + } + } + } + + return result; + } + +} diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 65cb1e617..2248238e2 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -73,7 +73,8 @@ private Future upload(LogFilePath localPath) throws Exception { localPath.getGeneration(), localPath.getKafkaPartition(), localPath.getOffset(), - localPath.getExtension()); + localPath.getExtension(), + mConfig); final String localLogFilename = localPath.getLogFilePath(); final String s3LogFilename = s3Path.getLogFilePath(); LOG.info("uploading file {} to {}", localLogFilename, s3LogFilename);