Skip to content

Commit

Permalink
1. Added additional properties for partition date format and s3 outpu…
Browse files Browse the repository at this point in the history
…t file naming pattern

2. Updated SecorConfig to read the new properties
3. Updated LogFilePath to take the s3 file naming pattern from properties
4. Created a new generic DateMessageParser which reads the output date format from property
5. Updated the Uploader to pass config object to LogFilePath
  • Loading branch information
SanthoshVasabhaktula committed Jul 28, 2015
1 parent 86956be commit 2cd03d0
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 8 deletions.
6 changes: 4 additions & 2 deletions src/main/config/secor.dev.partition.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ include=secor.dev.properties
secor.kafka.group=secor_partition

# Parser class that extracts s3 partitions from consumed messages.
secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser
secor.message.parser.class=com.pinterest.secor.parser.PatternDateMessageParser

# S3 path where sequence files are stored.
secor.s3.path=secor_dev/partition
secor.s3.path=secor_dev

# Local path where sequence files are stored before they are uploaded to s3.
secor.local.path=/tmp/secor_dev/message_logs/partition

# Port of the Ostrich server.
ostrich.port=9998

# Partition Date Output format. This is used along with PatternDateMessageParser.
secor.partition.output_dt_format=dd_MM_yyyy
19 changes: 14 additions & 5 deletions src/main/config/secor.dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ include=secor.common.properties
############

# Name of the s3 bucket where log files are stored.
secor.s3.bucket=
secor.s3.bucket=lpdev-ekstep

################
# END MUST SET #
Expand All @@ -17,8 +17,17 @@ kafka.seed.broker.port=9092
zookeeper.quorum=localhost:2181

# Upload policies.
# 10K
secor.max.file.size.bytes=10000
# 10 seconds
secor.max.file.age.seconds=10
# 100M
secor.max.file.size.bytes=1000000000
# 1 minute
secor.max.file.age.seconds=60

# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz.
# Available placeholders are
# topic - The topic name the data is being fetched
# partition - The partition name
# generation - Generation
# kafkaPartition - The kafka partition
# fmOffset - First Message offset in the file.
# randomHex - A 4 character random hex to append to the file name
secor.s3.output_file_pattern={randomHex}_{partition}_{topic}_{generation}.json
52 changes: 52 additions & 0 deletions src/main/java/com/pinterest/secor/common/LogFilePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package com.pinterest.secor.common;

import com.pinterest.secor.message.ParsedMessage;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
* LogFilePath represents path of a log file. It contains convenience method for building and
Expand Down Expand Up @@ -49,6 +55,7 @@ public class LogFilePath {
private final int mKafkaPartition;
private final long mOffset;
private final String mExtension;
private String mOutputFilePattern;

public LogFilePath(String prefix, int generation, long lastCommittedOffset,
ParsedMessage message, String extension) {
Expand All @@ -71,6 +78,19 @@ public LogFilePath(String prefix, String topic, String[] partitions, int generat
mOffset = offset;
mExtension = extension;
}

public LogFilePath(String prefix, String topic, String[] partitions, int generation,
int kafkaPartition, long offset, String extension, SecorConfig config) {

mPrefix = prefix;
mTopic = topic;
mPartitions = partitions;
mGeneration = generation;
mKafkaPartition = kafkaPartition;
mOffset = offset;
mExtension = extension;
mOutputFilePattern = config.getS3OutputFilePattern();
}

private static String[] subArray(String[] array, int startIndex, int endIndex) {
String[] result = new String[endIndex - startIndex + 1];
Expand Down Expand Up @@ -139,6 +159,11 @@ private String getLogFileBasename() {
}

public String getLogFilePath() {

if (StringUtils.isNotBlank(mOutputFilePattern)) {
return getLogFilePath(mOutputFilePattern);
}

String basename = getLogFileBasename();

ArrayList<String> pathElements = new ArrayList<String>();
Expand All @@ -147,6 +172,27 @@ public String getLogFilePath() {

return StringUtils.join(pathElements, "/") + mExtension;
}

private String getLogFilePath(String pattern) {

List<String> pathElements = new ArrayList<String>();
pathElements.add(mPrefix);
pathElements.add(StrSubstitutor.replace(pattern, getValueMap(), "{", "}"));
System.out.println("Path:" + StringUtils.join(pathElements, "/") + mExtension);
return StringUtils.join(pathElements, "/") + mExtension;
}

private Map<String, String> getValueMap() {

Map<String, String> valueMap = new HashMap<String, String>();
valueMap.put("randomHex", getRandomHex());
valueMap.put("partition", mPartitions[0]);
valueMap.put("topic", mTopic);
valueMap.put("generation", mGeneration + "");
valueMap.put("kafkaPartition", mKafkaPartition + "");
valueMap.put("fmOffset", String.format("%020d", mOffset));
return valueMap;
}

public String getLogFileCrcPath() {
String basename = "." + getLogFileBasename() + ".crc";
Expand Down Expand Up @@ -214,4 +260,10 @@ public int hashCode() {
public String toString() {
return getLogFilePath();
}

public static String getRandomHex() {

Random random = new Random();
return StringUtils.substring(Integer.toHexString(random.nextInt()), 0, 4);
}
}
8 changes: 8 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ public String getPerfTestTopicPrefix() {
public String getZookeeperPath() {
return getString("secor.zookeeper.path");
}

public String getS3OutputFilePattern() {
return getString("secor.s3.output_file_pattern");
}

public String getPartitionOutputDtFormat() {
return getString("secor.partition.output_dt_format");
}

public boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.parser;

import java.text.SimpleDateFormat;
import java.util.Date;

import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;

/**
* PatternDateMessageParser extracts timestamp field (specified by 'message.timestamp.name') and the date pattern (specified by 'message.timestamp.input.pattern').
* The output file pattern is fetched from the property 'secor.partition.output_dt_format'.
*
* This generic class can even handle the DateMessageParse functionality. For ex: it will generate the same partition when the 'secor.partition.output_dt_format' property is set to "'dt='yyyy-MM-dd"
*
* @see http://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html
*
* @author Santhosh Vasabhaktula ([email protected])
*
*/
public class PatternDateMessageParser extends MessageParser {
private static final Logger LOG = LoggerFactory.getLogger(PatternDateMessageParser.class);
protected static final String defaultDate = "1970-01-01";
protected static final String defaultFormatter = "yyyy-MM-dd";

public PatternDateMessageParser(SecorConfig config) {
super(config);
}

@Override
public String[] extractPartitions(Message message) {
JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
String result[] = { defaultDate };

if (jsonObject != null) {
Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (fieldValue != null && inputPattern != null) {
try {
SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString());
SimpleDateFormat outputFormatter = new SimpleDateFormat(StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
Date dateFormat = inputFormatter.parse(fieldValue.toString());
result[0] = outputFormatter.format(dateFormat);
return result;
} catch (Exception e) {
LOG.warn("Impossible to convert date = " + fieldValue.toString()
+ " for the input pattern = " + inputPattern.toString()
+ ". Using date default=" + result[0]);
}
}
}

return result;
}

}
3 changes: 2 additions & 1 deletion src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private Future<?> upload(LogFilePath localPath) throws Exception {
localPath.getGeneration(),
localPath.getKafkaPartition(),
localPath.getOffset(),
localPath.getExtension());
localPath.getExtension(),
mConfig);
final String localLogFilename = localPath.getLogFilePath();
final String s3LogFilename = s3Path.getLogFilePath();
LOG.info("uploading file {} to {}", localLogFilename, s3LogFilename);
Expand Down

0 comments on commit 2cd03d0

Please sign in to comment.