diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index f6c9c2f9b..1c296212d 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -86,6 +86,22 @@ secor.offsets.per.partition=10000000 # How long does it take for secor to forget a topic partition. Applies to stats generation only. secor.topic_partition.forget.seconds=600 +# Setting the partitioner to use hourly partition +# By default, the partitioner will do daily partition, so the data will be +# written into +# s3n://.../topic/dt=2015-07-07/ +# If this parameter is set to true, the data will be written into +# s3n://.../topic/dt=2015-07-07/hr=02 +# The hour folder ranges from 00 to 23 +# partitioner.granularity.hour=true + +# During partition finalization, the finalizer will start from the last +# time partition (e.g. dt=2015-07-17) and traverse backwards for n +# partition periods (e.g. dt=2015-07-16, dt=2015-07-15 ...) +# This parameter controls how many partition periods to traverse back +# The default is 10 +# secor.finalizer.lookback.periods=10 + # If greater than 0, upon starup Secor will clean up directories and files under secor.local.path # that are older than this value. secor.local.log.delete.age.hours=-1 diff --git a/src/main/config/secor.dev.hr.partition.properties b/src/main/config/secor.dev.hr.partition.properties new file mode 100644 index 000000000..95c7017c2 --- /dev/null +++ b/src/main/config/secor.dev.hr.partition.properties @@ -0,0 +1,11 @@ +include=secor.dev.properties + +secor.kafka.group=secor_hr_partition +secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser + +secor.s3.path=secor_dev/hr_partition +secor.local.path=/tmp/secor_dev/message_logs/hr_partition + +partitioner.granularity.hour=true + +ostrich.port=9998 diff --git a/src/main/config/secor.dev.properties b/src/main/config/secor.dev.properties index b1ad32099..ec30afa71 100644 --- a/src/main/config/secor.dev.properties +++ b/src/main/config/secor.dev.properties @@ -19,6 +19,6 @@ zookeeper.quorum=localhost:2181 # Upload policies. # 10K secor.max.file.size.bytes=10000 -# 1 minute -secor.max.file.age.seconds=60 +# 10 seconds +secor.max.file.age.seconds=10 diff --git a/src/main/config/secor.prod.properties b/src/main/config/secor.prod.properties index 0801a5c99..56047cb57 100644 --- a/src/main/config/secor.prod.properties +++ b/src/main/config/secor.prod.properties @@ -37,5 +37,6 @@ zookeeper.quorum= # 200MB secor.max.file.size.bytes=200000000 # 1 hour +# for hourly ingestion/finalization, set this property to smaller value, e.g. 1800 secor.max.file.age.seconds=3600 diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 367dc03f4..753288c0d 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -42,13 +42,13 @@ * @author Pawel Garbacki (pawel@pinterest.com) */ public class LogFilePath { - private String mPrefix; - private String mTopic; - private String[] mPartitions; - private int mGeneration; - private int mKafkaPartition; - private long mOffset; - private String mExtension; + private final String mPrefix; + private final String mTopic; + private final String[] mPartitions; + private final int mGeneration; + private final int mKafkaPartition; + private final long mOffset; + private final String mExtension; public LogFilePath(String prefix, int generation, long lastCommittedOffset, ParsedMessage message, String extension) { diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 2e03856f2..23621750c 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -211,6 +211,10 @@ public String getMessageTimestampInputPattern() { return getString("message.timestamp.input.pattern"); } + public int getFinalizerLookbackPeriods() { + return getInt("secor.finalizer.lookback.periods", 10); + } + public String getHivePrefix() { return getString("secor.hive.prefix"); } @@ -235,6 +239,10 @@ public String getZookeeperPath() { return getString("secor.zookeeper.path"); } + public boolean getBoolean(String name, boolean defaultValue) { + return mProperties.getBoolean(name, defaultValue); + } + private void checkProperty(String name) { if (!mProperties.containsKey(name)) { throw new RuntimeException("Failed to find required configuration option '" + @@ -252,6 +260,10 @@ private int getInt(String name) { return mProperties.getInt(name); } + private int getInt(String name, int defaultValue) { + return mProperties.getInt(name, defaultValue); + } + private long getLong(String name) { return mProperties.getLong(name); } diff --git a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java index 6e768c3df..25c3a86ee 100644 --- a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java +++ b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java @@ -63,6 +63,19 @@ private static CommandLine parseArgs(String[] args) throws ParseException { .withArgName("") .withType(String.class) .create("type")); + options.addOption(OptionBuilder.withLongOpt("broker") + .withDescription("broker string, e.g. localhost:9092") + .hasArg() + .withArgName("") + .withType(String.class) + .create("broker")); + options.addOption(OptionBuilder.withLongOpt("timeshift") + .withDescription("message timestamp adjustment in seconds, it will be deducted" + + " from current time") + .hasArg() + .withArgName("") + .withType(Number.class) + .create("timeshift")); CommandLineParser parser = new GnuParser(); return parser.parse(options, args); @@ -74,9 +87,13 @@ public static void main(String[] args) { String topic = commandLine.getOptionValue("topic"); int messages = ((Number) commandLine.getParsedOptionValue("messages")).intValue(); int producers = ((Number) commandLine.getParsedOptionValue("producers")).intValue(); + String broker = commandLine.getOptionValue("broker"); String type = commandLine.getOptionValue("type"); + Number timeshiftNumber = ((Number)commandLine.getParsedOptionValue("timeshift")); + int timeshift = timeshiftNumber == null ? 0 : timeshiftNumber.intValue(); for (int i = 0; i < producers; ++i) { - TestLogMessageProducer producer = new TestLogMessageProducer(topic, messages, type); + TestLogMessageProducer producer = new TestLogMessageProducer( + topic, messages, type, broker, timeshift); producer.start(); } } catch (Throwable t) { @@ -84,4 +101,4 @@ public static void main(String[] args) { System.exit(1); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index b826db3ae..58e12fa68 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -25,12 +25,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Stack; /** * Partition finalizer writes _SUCCESS files to date partitions that very likely won't be receiving @@ -41,12 +39,13 @@ public class PartitionFinalizer { private static final Logger LOG = LoggerFactory.getLogger(PartitionFinalizer.class); - private SecorConfig mConfig; - private ZookeeperConnector mZookeeperConnector; - private TimestampedMessageParser mMessageParser; - private KafkaClient mKafkaClient; - private QuboleClient mQuboleClient; - private String mFileExtension; + private final SecorConfig mConfig; + private final ZookeeperConnector mZookeeperConnector; + private final TimestampedMessageParser mMessageParser; + private final KafkaClient mKafkaClient; + private final QuboleClient mQuboleClient; + private final String mFileExtension; + private final int mLookbackPeriods; public PartitionFinalizer(SecorConfig config) throws Exception { mConfig = config; @@ -61,154 +60,120 @@ public PartitionFinalizer(SecorConfig config) throws Exception { } else { mFileExtension = ""; } + mLookbackPeriods = config.getFinalizerLookbackPeriods(); + LOG.info("Lookback periods: " + mLookbackPeriods); } - private long getLastTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getLastMessage(topicPartition); - if (message == null) { - // This will happen if no messages have been posted to the given topic partition. - LOG.error("No message found for topic {} partition {}" + topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } - - private long getLastTimestampMillis(String topic) throws Exception { + private String[] getFinalizedUptoPartitions(String topic) throws Exception { final int numPartitions = mKafkaClient.getNumPartitions(topic); - long max_timestamp = Long.MIN_VALUE; + List lastMessages = new ArrayList(numPartitions); + List committedMessages = new ArrayList(numPartitions); for (int partition = 0; partition < numPartitions; ++partition) { TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getLastTimestampMillis(topicPartition); - if (timestamp > max_timestamp) { - max_timestamp = timestamp; + Message lastMessage = mKafkaClient.getLastMessage(topicPartition); + Message committedMessage = mKafkaClient.getCommittedMessage(topicPartition); + if (lastMessage == null || committedMessage == null) { + // This will happen if no messages have been posted to the given topic partition. + LOG.error("For topic {} partition {}, lastMessage: {}, commmitted: {}", + topicPartition.getTopic(), topicPartition.getPartition(), + lastMessage, committedMessage); + continue; } + lastMessages.add(lastMessage); + committedMessages.add(committedMessage); } - if (max_timestamp == Long.MIN_VALUE) { - return -1; - } - return max_timestamp; + return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages); } - private long getCommittedTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getCommittedMessage(topicPartition); - if (message == null) { - LOG.error("No message found for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } + private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception { + final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - private long getCommittedTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getCommittedTimestampMillis(topicPartition); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; + LOG.info("Finalize up to (but not include) {}, dim: {}", + uptoPartitions, uptoPartitions.length); + + String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); + Stack toBeFinalized = new Stack(); + // Walk backwards to collect all partitions which are previous to the upTo partition + // Do not include the upTo partition + // Stop at the first partition which already have the SUCCESS file + for (int i = 0; i < mLookbackPeriods; i++) { + LOG.info("Looking for partition: " + Arrays.toString(previous)); + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + if (FileUtil.exists(logFileDir)) { + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info( + "SuccessFile exist already, short circuit return. " + successFilePath); + break; } + LOG.info("Folder {} exists and ready to be finalized.", logFileDir); + toBeFinalized.push(previous); + } else { + LOG.info("Folder {} doesn't exist, skip", logFileDir); } + previous = mMessageParser.getPreviousPartitions(previous); } - if (minTimestamp == Long.MAX_VALUE) { - return -1; + + LOG.info("To be finalized partitions: {}", toBeFinalized); + if (toBeFinalized.isEmpty()) { + LOG.warn("There is no partitions to be finalized."); + return; } - return minTimestamp; - } - private NavigableSet getPartitions(String topic) throws IOException, ParseException { - final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - String[] partitions = {"dt="}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String parentDir = logFilePath.getLogFileParentDir(); - String[] partitionDirs = FileUtil.list(parentDir); - Pattern pattern = Pattern.compile(".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)$"); - TreeSet result = new TreeSet(); - for (String partitionDir : partitionDirs) { - Matcher matcher = pattern.matcher(partitionDir); - if (matcher.find()) { - String date = matcher.group(1); - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTime(format.parse(date)); - result.add(calendar); + // Now walk forward the collected partitions to do the finalization + // Note we are deliberately walking backwards and then forwards to make sure we don't + // end up in a situation that a later date partition is finalized and then the system + // crashes (which creates unfinalized partition folders in between) + while (!toBeFinalized.isEmpty()) { + String[] current = toBeFinalized.pop(); + LOG.info("Finalizing partition: " + Arrays.toString(current)); + // We only perform hive registration on the last dimension of the partition array + // i.e. only do hive registration for the hourly folder, but not for the daily + if (uptoPartitions.length == current.length) { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < current.length; i++) { + String par = current[i]; + // We expect the partition array in the form of key=value if + // they need to go through hive registration + String[] parts = par.split("="); + assert parts.length == 2 : "wrong partition format: " + par; + if (i > 0) { + sb.append(","); + } + sb.append(parts[0]); + sb.append("='"); + sb.append(parts[1]); + sb.append("'"); + } + LOG.info("Hive partition string: " + sb); + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); + } + } catch (Exception e) { + LOG.error("failed to finalize topic " + topic, e); + continue; + } } - } - return result; - } - private void finalizePartitionsUpTo(String topic, Calendar calendar) throws IOException, - ParseException, InterruptedException { - NavigableSet partitionDates = - getPartitions(topic).headSet(calendar, true).descendingSet(); - final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); - for (Calendar partition : partitionDates) { - String partitionStr = format.format(partition.getTime()); - String[] partitions = {"dt=" + partitionStr}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, + // Generate the SUCCESS file at the end + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, mConfig.getGeneration(), 0, 0, mFileExtension); String logFileDir = logFilePath.getLogFileDir(); - assert FileUtil.exists(logFileDir) : "FileUtil.exists(" + logFileDir + ")"; String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - return; - } - try { - mQuboleClient.addPartition(mConfig.getHivePrefix() + topic, "dt='" + partitionStr + "'"); - } catch (Exception e) { - LOG.error("failed to finalize topic {} partition dt={}", topic , partitionStr, e); - continue; - } + LOG.info("touching file {}", successFilePath); FileUtil.touch(successFilePath); } - } - /** - * Get finalized timestamp for a given topic partition. Finalized timestamp is the current time - * if the last offset for that topic partition has been committed earlier than an hour ago. - * Otherwise, finalized timestamp is the committed timestamp. - * - * @param topicPartition The topic partition for which we want to compute the finalized - * timestamp. - * @return The finalized timestamp for the topic partition. - * @throws Exception - */ - private long getFinalizedTimestampMillis(TopicPartition topicPartition) throws Exception { - long lastTimestamp = getLastTimestampMillis(topicPartition); - long committedTimestamp = getCommittedTimestampMillis(topicPartition); - long now = System.currentTimeMillis(); - if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { - return now; - } - return committedTimestamp; - } - - private long getFinalizedTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getFinalizedTimestampMillis(topicPartition); - LOG.info("finalized timestamp for topic {} partition {} is {}", topic, partition, timestamp); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; - } - } - } - if (minTimestamp == Long.MAX_VALUE) { - return -1; - } - return minTimestamp; } public void finalizePartitions() throws Exception { @@ -218,15 +183,10 @@ public void finalizePartitions() throws Exception { LOG.info("skipping topic {}", topic); } else { LOG.info("finalizing topic {}", topic); - long finalizedTimestampMillis = getFinalizedTimestampMillis(topic); - LOG.info("finalized timestamp for topic {} is {}", topic , finalizedTimestampMillis); - if (finalizedTimestampMillis != -1) { - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTimeInMillis(finalizedTimestampMillis); - // Introduce a lag of one day and one hour. - calendar.add(Calendar.HOUR, -1); - calendar.add(Calendar.DAY_OF_MONTH, -1); - finalizePartitionsUpTo(topic, calendar); + String[] partitions = getFinalizedUptoPartitions(topic); + LOG.info("finalized timestamp for topic {} is {}", topic , partitions); + if (partitions != null) { + finalizePartitionsUpTo(topic, partitions); } } } diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java new file mode 100644 index 000000000..6253758e0 --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/Partitioner.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.secor.parser; + +import com.pinterest.secor.message.Message; + +import java.util.List; + +/** + * The Partitioner knows when to finalize a file folder partition. + * + * A file folder partition (e.g. dt=2015-07-07) can be finalized when all + * messages in that date arrived. The caller (PartitionFinalizer) will do the + * finalization work (e.g. generate _SUCCESS file, perform hive registration) + * + * The partitioner provide the method to calculate the range of file + * folder partitions to be finalized and provide the method to iterate through + * the range. + * + * The caller will first provide a list of last-consumed messages for a given + * kafka topic and call #getFinalizedUptoPartitions to get the finalized-up-to + * partition and then walk backwards by calling #getPreviousPartitions to + * collect all the previous partitions which are ready to be finalized. + * + * Note that finalize-up-to partition itself is not inclusive in the range of + * partitions to be finalized. + * + * The caller might repeat this loop multiple times when the filesystem partition + * is multi-dimentional (e.g. [dt=2015-07-07,hr=05]). it will loop once for the + * hourly folder finalization and another time for the daily folder. + * + * Note that although we use daily/hourly partition illustrate the use of + * partitioner, it is be no means the partitioner can only work with timestamp + * based partitioning, it should also be able to work with offset based + * partitioning as long as we establish an iterating order within those + * partitions. + * + * @author Henry Cai (hcai@pinterest.com) + */ +public interface Partitioner { + /** + * Calculates the partition to finalize-up-to from a list of last-consumed + * messages and a list of last-enqueued messages. + * + * For each kafka topic/partition for a given topic, the caller will provide + * two messages: + * * lastMessage: the last message at the tail of the kafka queue + * * committedMessage: the message secor consumed and committed + * And then iterate over all the kafka topic partitions for the given topic, + * the caller will gather the above two messages into two lists. + * + * The Partitioner will compare the messages from all kafka partitions to + * see which one is the earliest to finalize up to. The partitioner will + * normally use the timestamp from the committedMessage to decide + * the finalize time. But for some slow topics where there is no new + * messages coming for a while (i.e. lastMessage == committedMessage), + * the partitioner can use the current time as the finalize time. + * + * Note that the up-to partition itself is not inclusive in the range to be + * finalized. For example, when the last message is in 2015-07-07, + * 7/7 itself is not complete yet. + * + * Note also that the partitioner might want to adjust down the finalize + * time to allow a safety lag for late arrival messages. e.g. adding one + * extra hour lag + * + * @param lastMessages the last message at the tail of the queue + * @param committedMessages the message secor consumed and committed + * + * @return a String array to represent a file folder partition to finalize up to + */ + String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception; + + /** + * Get the previous partition out of the incoming partition. + * E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"] + * + * Note that the implementation might return the previous sequence in daily/mixed forms, e.g. + * [dt=2015-07-07, hr=01] + * [dt=2015-07-07, hr=00] + * [dt=2015-07-07] <-- dt folder in between + * [dt=2015-07-06, hr=23] + * [dt=2015-07-07, hr=22] + * + * @param partition + * @return + */ + String[] getPreviousPartitions(String[] partition) throws Exception; +} diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index 74e4b0a02..c0f330044 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -19,22 +19,45 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; -public abstract class TimestampedMessageParser extends MessageParser { +public abstract class TimestampedMessageParser extends MessageParser implements Partitioner { + + private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class); + + private static final long HOUR_IN_MILLIS = 3600L * 1000L; + private static final long DAY_IN_MILLIS = 3600L * 24 * 1000L; + + private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd"); + private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH"); + private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH"); - private SimpleDateFormat mFormatter; + static { + mDtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mDtHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + private final boolean mUsingHourly; public TimestampedMessageParser(SecorConfig config) { super(config); - mFormatter = new SimpleDateFormat("yyyy-MM-dd"); - mFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mUsingHourly = usingHourly(config); + LOG.info("UsingHourly: {}", mUsingHourly); } public abstract long extractTimestampMillis(final Message message) throws Exception; + static boolean usingHourly(SecorConfig config) { + return config.getBoolean("partitioner.granularity.hour", false); + } + protected static long toMillis(final long timestamp) { final long nanosecondDivider = (long) Math.pow(10, 9 + 9); final long millisecondDivider = (long) Math.pow(10, 9 + 3); @@ -49,12 +72,101 @@ protected static long toMillis(final long timestamp) { return timestampMillis; } + protected String[] generatePartitions(long timestampMillis, boolean usingHourly) + throws Exception { + Date date = new Date(timestampMillis); + String dt = "dt=" + mDtFormatter.format(date); + String hr = "hr=" + mHrFormatter.format(date); + if (usingHourly) { + return new String[]{dt, hr}; + } else { + return new String[]{dt}; + } + } + + protected long parsePartitions(String[] partitions) throws Exception { + String dtValue = partitions[0].split("=")[1]; + String hrValue = partitions.length > 1 ? partitions[1].split("=")[1] : "00"; + String value = dtValue + "-" + hrValue; + Date date = mDtHrFormatter.parse(value); + return date.getTime(); + } + @Override public String[] extractPartitions(Message message) throws Exception { // Date constructor takes milliseconds since epoch. long timestampMillis = extractTimestampMillis(message); - Date date = new Date(timestampMillis); - String result[] = {"dt=" + mFormatter.format(date)}; - return result; + return generatePartitions(timestampMillis, mUsingHourly); } + + private long getFinalizedTimestampMillis(Message lastMessage, + Message committedMessage) throws Exception { + long lastTimestamp = extractTimestampMillis(lastMessage); + long committedTimestamp = extractTimestampMillis(committedMessage); + long now = System.currentTimeMillis(); + if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { + LOG.info("No new message coming, use the current time: " + now); + return now; + } + return committedTimestamp; + } + + @Override + public String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception { + if (lastMessages == null || committedMessages == null) { + LOG.error("Either: {} and {} is null", lastMessages, + committedMessages); + return null; + } + assert lastMessages.size() == committedMessages.size(); + + long minMillis = Long.MAX_VALUE; + for (int i = 0; i < lastMessages.size(); i++) { + long millis = getFinalizedTimestampMillis(lastMessages.get(i), + committedMessages.get(i)); + if (millis < minMillis) { + minMillis = millis; + } + } + if (minMillis == Long.MAX_VALUE) { + LOG.error("No valid timestamps among messages: {} and {}", lastMessages, + committedMessages); + return null; + } + + // add the safety lag for late-arrival messages + minMillis -= 3600L * 1000L; + LOG.info("adjusted millis {}", minMillis); + return generatePartitions(minMillis, mUsingHourly); + } + + @Override + public String[] getPreviousPartitions(String[] partitions) throws Exception { + long millis = parsePartitions(partitions); + boolean usingHourly = mUsingHourly; + if (mUsingHourly && millis % DAY_IN_MILLIS == 0) { + // On the day boundary, if the currrent partition is [dt=07-07, hr=00], the previous + // one is dt=07-06; If the current one is [dt=07-06], the previous one is + // [dt=07-06, hr-23] + // So we would return in the order of: + // dt=07-07, hr=01 + // dt=07-07, hr=00 + // dt=07-06 + // dt=07-06, hr=23 + if (partitions.length == 2 ) { + usingHourly = false; + millis -= DAY_IN_MILLIS; + } else { + usingHourly = true; + millis += DAY_IN_MILLIS; + millis -= HOUR_IN_MILLIS; + } + } else { + long delta = mUsingHourly ? HOUR_IN_MILLIS : DAY_IN_MILLIS; + millis -= delta; + } + return generatePartitions(millis, usingHourly); + } + } diff --git a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java index fee7070cd..a8eb86023 100644 --- a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java +++ b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java @@ -40,16 +40,25 @@ public class TestLogMessageProducer extends Thread { private final String mTopic; private final int mNumMessages; private final String mType; + private final String mMetadataBrokerList; + private final int mTimeshift; - public TestLogMessageProducer(String topic, int numMessages, String type) { + public TestLogMessageProducer(String topic, int numMessages, String type, + String metadataBrokerList, int timeshift) { mTopic = topic; mNumMessages = numMessages; mType = type; + mMetadataBrokerList = metadataBrokerList; + mTimeshift = timeshift; } public void run() { Properties properties = new Properties(); - properties.put("metadata.broker.list", "localhost:9092"); + if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) { + properties.put("metadata.broker.list", "localhost:9092"); + } else { + properties.put("metadata.broker.list", mMetadataBrokerList); + } properties.put("partitioner.class", "com.pinterest.secor.tools.RandomPartitioner"); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); @@ -69,7 +78,8 @@ public void run() { TSerializer serializer = new TSerializer(protocol); for (int i = 0; i < mNumMessages; ++i) { - TestMessage testMessage = new TestMessage(System.currentTimeMillis() * 1000000L + i, + long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i; + TestMessage testMessage = new TestMessage(time, "some_value_" + i); if (i % 2 == 0) { testMessage.setEnumField(TestEnum.SOME_VALUE); diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index d98294766..65cb1e617 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -199,6 +199,7 @@ private void trimFiles(TopicPartition topicPartition, long startOffset) throws E private void checkTopicPartition(TopicPartition topicPartition) throws Exception { final long size = mFileRegistry.getSize(topicPartition); final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition); + LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec); if (size >= mConfig.getMaxFileSizeBytes() || modificationAgeSec >= mConfig.getMaxFileAgeSeconds()) { long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition); @@ -206,6 +207,7 @@ private void checkTopicPartition(TopicPartition topicPartition) throws Exception newOffsetCount); long lastSeenOffset = mOffsetTracker.getLastSeenOffset(topicPartition); if (oldOffsetCount == newOffsetCount) { + LOG.debug("Uploading for: " + topicPartition); uploadFiles(topicPartition); } else if (newOffsetCount > lastSeenOffset) { // && oldOffset < newOffset LOG.debug("last seen offset {} is lower than committed offset count {}. Deleting files in topic {} partition {}", diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 8034fb088..d42f7a8dd 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -38,7 +38,7 @@ PARENT_DIR=/tmp/secor_dev LOGS_DIR=${PARENT_DIR}/logs BUCKET=${SECOR_BUCKET:-test-bucket} S3_LOGS_DIR=s3://${BUCKET}/secor_dev -MESSAGES=1000 +MESSAGES=100 MESSAGE_TYPE=binary # For the compression tests to work, set this to the path of the Hadoop native libs. HADOOP_NATIVE_LIB_PATH=lib @@ -46,6 +46,8 @@ HADOOP_NATIVE_LIB_PATH=lib ADDITIONAL_OPTS= # various reader writer options to be used for testing +# note associate array needs bash v4 support +# declare -A READER_WRITERS READER_WRITERS[json]=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory @@ -74,9 +76,11 @@ check_for_native_libs() { recreate_dirs() { run_command "rm -r -f ${PARENT_DIR}" if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR}" else run_command "s3cmd del --recursive ${S3_LOGS_DIR}" + run_command "s3cmd ls -r ${S3_LOGS_DIR}" fi # create logs directory if [ ! -d ${LOGS_DIR} ]; then @@ -88,7 +92,7 @@ start_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then if command -v fakes3 > /dev/null 2>&1; then run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &" - sleep 2 + sleep 10 run_command "s3cmd -c ${CONF_DIR}/test.s3cfg mb s3://${BUCKET}" else echo "Couldn't find FakeS3 binary, please install it using `gem install fakes3`" @@ -98,7 +102,7 @@ start_s3() { stop_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true" + run_command "pkill -f 'fakes3' || true" run_command "rm -r -f /tmp/fakes3" fi } @@ -137,23 +141,46 @@ stop_secor() { run_command "pkill -f 'com.pinterest.secor.main.ConsumerMain' || true" } +run_finalizer() { + run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ + -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp $CLASSPATH \ + com.pinterest.secor.main.PartitionFinalizerMain > ${LOGS_DIR}/finalizer.log 2>&1 " + + EXIT_CODE=$? + if [ ${EXIT_CODE} -ne 0 ]; then + echo -e "\e[1;41;97mFinalizer FAILED\e[0m" + echo "See log ${LOGS_DIR}/finalizer.log for more details" + exit ${EXIT_CODE} + fi +} + create_topic() { run_command "${BASE_DIR}/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper \ localhost:2181 --replication-factor 1 --partitions 2 --topic test > \ ${LOGS_DIR}/create_topic.log 2>&1" } +# post messages +# $1 number of messages +# $2 timeshift in seconds post_messages() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ -Dconfig=secor.test.backup.properties -cp ${CLASSPATH} \ - com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} > \ + com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} -timeshift $2 > \ ${LOGS_DIR}/test_log_message_producer.log 2>&1" } +# verify the messages +# $1: number of messages +# $2: number of _SUCCESS files verify() { + echo "Verifying $1 $2" + RUNMODE_0="backup" if [ "${MESSAGE_TYPE}" = "binary" ]; then RUNMODE_1="partition" + else + RUNMODE_1="backup" fi for RUNMODE in ${RUNMODE_0} ${RUNMODE_1}; do run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ @@ -165,10 +192,26 @@ verify() { echo -e "\e[1;41;97mVerification FAILED\e[0m" echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details" tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log - stop_all - stop_s3 + echo "See log ${LOGS_DIR}/secor_${RUNMODE}.log for more details" + tail -n 50 ${LOGS_DIR}/secor_${RUNMODE}.log + echo "See log ${LOGS_DIR}/test_log_message_producer.log for more details" + tail -n 50 ${LOGS_DIR}/test_log_message_producer.log exit ${VERIFICATION_EXIT_CODE} fi + + # Verify SUCCESS file + if [ -n "${SECOR_LOCAL_S3}" ]; then + run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + else + run_command "s3cmd ls -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + fi + count=$( lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(1, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + + String[] previous = jsonMessageParser.getPreviousPartitions(uptoPartitions); + assertEquals(1, previous.length); + assertEquals("dt=2014-07-20", previous[0]); + } + + @Test + public void testHourlyGetFinalizedUptoPartitions() throws Exception { + Mockito.when(TimestampedMessageParser.usingHourly(mConfig)).thenReturn(true); + JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); + + List lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(2, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + assertEquals("hr=01", uptoPartitions[1]); + + String[][] expectedPartitions = new String[][] { + new String[]{"dt=2014-07-21", "hr=00"}, + new String[]{"dt=2014-07-20"}, // there is day partition for previous day + new String[]{"dt=2014-07-20", "hr=23"}, + new String[]{"dt=2014-07-20", "hr=22"}, + new String[]{"dt=2014-07-20", "hr=21"}, + new String[]{"dt=2014-07-20", "hr=20"}, + new String[]{"dt=2014-07-20", "hr=19"}, + new String[]{"dt=2014-07-20", "hr=18"}, + new String[]{"dt=2014-07-20", "hr=17"}, + new String[]{"dt=2014-07-20", "hr=16"}, + new String[]{"dt=2014-07-20", "hr=15"}, + new String[]{"dt=2014-07-20", "hr=14"}, + new String[]{"dt=2014-07-20", "hr=13"}, + new String[]{"dt=2014-07-20", "hr=12"}, + new String[]{"dt=2014-07-20", "hr=11"}, + new String[]{"dt=2014-07-20", "hr=10"}, + new String[]{"dt=2014-07-20", "hr=09"}, + new String[]{"dt=2014-07-20", "hr=08"}, + new String[]{"dt=2014-07-20", "hr=07"}, + new String[]{"dt=2014-07-20", "hr=06"}, + new String[]{"dt=2014-07-20", "hr=05"}, + new String[]{"dt=2014-07-20", "hr=04"}, + new String[]{"dt=2014-07-20", "hr=03"}, + new String[]{"dt=2014-07-20", "hr=02"}, + new String[]{"dt=2014-07-20", "hr=01"}, + new String[]{"dt=2014-07-20", "hr=00"}, + new String[]{"dt=2014-07-19"}, // there is day partition for 2nd last day + new String[]{"dt=2014-07-19", "hr=23"} + }; + + String[] partitions = uptoPartitions; + List partitionsList = new ArrayList(); + for (int i = 0; i < 28; i++ ) { + String[] previous = jsonMessageParser.getPreviousPartitions(partitions); + partitionsList.add(previous); + partitions = previous; + } + + assertEquals(partitionsList.size(), expectedPartitions.length); + for (int i = 0; i < partitionsList.size(); i++) { + List expectedPartition = Arrays.asList(expectedPartitions[i]); + List retrievedPartition = Arrays.asList(partitionsList.get(i)); + assertEquals(expectedPartition, retrievedPartition); + } + } + }