diff --git a/src/main/config/secor.prod.properties b/src/main/config/secor.prod.properties index 0801a5c99..56047cb57 100644 --- a/src/main/config/secor.prod.properties +++ b/src/main/config/secor.prod.properties @@ -37,5 +37,6 @@ zookeeper.quorum= # 200MB secor.max.file.size.bytes=200000000 # 1 hour +# for hourly ingestion/finalization, set this property to smaller value, e.g. 1800 secor.max.file.age.seconds=3600 diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 367dc03f4..753288c0d 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -42,13 +42,13 @@ * @author Pawel Garbacki (pawel@pinterest.com) */ public class LogFilePath { - private String mPrefix; - private String mTopic; - private String[] mPartitions; - private int mGeneration; - private int mKafkaPartition; - private long mOffset; - private String mExtension; + private final String mPrefix; + private final String mTopic; + private final String[] mPartitions; + private final int mGeneration; + private final int mKafkaPartition; + private final long mOffset; + private final String mExtension; public LogFilePath(String prefix, int generation, long lastCommittedOffset, ParsedMessage message, String extension) { diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index d3fe22147..7d66ddd61 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -215,6 +215,14 @@ public boolean getMessageTimestampUsingHour() { return getBoolean("message.timestamp.using.hour", false); } + public int getFinalizerLagSecond() { + return getInt("finalizer.lag.second", 3600); + } + + public int getFinalizerLookbackPeriods() { + return getInt("finalizer.lookback.periods", 10); + } + public String getHivePrefix() { return getString("secor.hive.prefix"); } @@ -256,12 +264,12 @@ private int getInt(String name) { return mProperties.getInt(name); } - private boolean getBoolean(String name, boolean required) { - if (required) { - checkProperty(name); - return mProperties.getBoolean(name); - } - return mProperties.getBoolean(name, false); + private int getInt(String name, int defaultValue) { + return mProperties.getInt(name, defaultValue); + } + + private boolean getBoolean(String name, boolean defaultValue) { + return mProperties.getBoolean(name, defaultValue); } private long getLong(String name) { diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index fd85bbf3b..b9297d589 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -25,12 +25,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Stack; /** * Partition finalizer writes _SUCCESS files to date partitions that very likely won't be receiving @@ -41,13 +39,13 @@ public class PartitionFinalizer { private static final Logger LOG = LoggerFactory.getLogger(PartitionFinalizer.class); - private SecorConfig mConfig; - private ZookeeperConnector mZookeeperConnector; - private TimestampedMessageParser mMessageParser; - private KafkaClient mKafkaClient; - private QuboleClient mQuboleClient; - private String mFileExtension; - private final boolean usingHourly; + private final SecorConfig mConfig; + private final ZookeeperConnector mZookeeperConnector; + private final TimestampedMessageParser mMessageParser; + private final KafkaClient mKafkaClient; + private final QuboleClient mQuboleClient; + private final String mFileExtension; + private final int mLookbackPeriods; public PartitionFinalizer(SecorConfig config) throws Exception { mConfig = config; @@ -62,216 +60,123 @@ public PartitionFinalizer(SecorConfig config) throws Exception { } else { mFileExtension = ""; } - usingHourly = config.getMessageTimestampUsingHour(); + mLookbackPeriods = config.getFinalizerLookbackPeriods(); + LOG.info("Lookback periods: " + mLookbackPeriods); } - private long getLastTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getLastMessage(topicPartition); - if (message == null) { - // This will happen if no messages have been posted to the given topic partition. - LOG.error("No message found for topic {} partition {}" + topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } - - private long getLastTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long max_timestamp = Long.MIN_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getLastTimestampMillis(topicPartition); - if (timestamp > max_timestamp) { - max_timestamp = timestamp; - } - } - if (max_timestamp == Long.MIN_VALUE) { - return -1; - } - return max_timestamp; - } - - private long getCommittedTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getCommittedMessage(topicPartition); - if (message == null) { - LOG.error("No message found for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } - - private long getCommittedTimestampMillis(String topic) throws Exception { + private String[] getFinalizedUptoPartitions(String topic) throws Exception { final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; + List lastMessages = new ArrayList(numPartitions); + List committedMessages = new ArrayList(numPartitions); for (int partition = 0; partition < numPartitions; ++partition) { TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getCommittedTimestampMillis(topicPartition); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; - } + Message lastMessage = mKafkaClient.getLastMessage(topicPartition); + Message committedMessage = mKafkaClient.getCommittedMessage(topicPartition); + if (lastMessage == null || committedMessage == null) { + // This will happen if no messages have been posted to the given topic partition. + LOG.error("For topic {} partition {}, lastMessage: {}, commmitted: {}", + topicPartition.getTopic(), topicPartition.getPartition(), + lastMessage, committedMessage); + continue; } + lastMessages.add(lastMessage); + committedMessages.add(committedMessage); } - if (minTimestamp == Long.MAX_VALUE) { - return -1; - } - return minTimestamp; + return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages); } - private NavigableSet getPartitions(String topic) throws IOException, ParseException { + private void finalizePartitionsUpTo(String topic, String[] partitions) throws Exception { final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - String[] partitions = usingHourly ? new String[]{"dt=", "hr="} : new String[]{"dt="}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String parentDir = logFilePath.getLogFileParentDir(); - String[] partitionDirs = FileUtil.list(parentDir); - if (usingHourly) { - List dirs = new ArrayList(); - for (String partionDir : partitionDirs) { - dirs.addAll(Arrays.asList(FileUtil.list(partionDir))); - } - partitionDirs = dirs.toArray(new String[dirs.size()]); - } - String patternStr = ".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)"; - if (usingHourly) { - patternStr += "/hr=(\\d\\d)"; - } - patternStr += "$"; - LOG.info("patternStr: " + patternStr); - Pattern pattern = Pattern.compile(patternStr); - TreeSet result = new TreeSet(); - for (String partitionDir : partitionDirs) { - Matcher matcher = pattern.matcher(partitionDir); - if (matcher.find()) { - String date = matcher.group(1); - String hour = usingHourly ? matcher.group(2) : "00"; - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTime(format.parse(date + "-" + hour)); - result.add(calendar); - } - } - return result; - } - private void finalizePartitionsUpTo(String topic, Calendar calendar) throws IOException, - ParseException, InterruptedException { - NavigableSet partitionDates = - getPartitions(topic).headSet(calendar, true).descendingSet(); - final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd"); - SimpleDateFormat hrFormat = new SimpleDateFormat("HH"); - dtFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - hrFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - NavigableSet finishedDates = new TreeSet(); - for (Calendar partition : partitionDates) { - String dtPartitionStr = dtFormat.format(partition.getTime()); - String hrPartitionStr = hrFormat.format(partition.getTime()); - String[] partitions = usingHourly - ? new String[]{"dt=" + dtPartitionStr, "hr=" + hrPartitionStr} - : new String[]{"dt=" + dtPartitionStr}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - assert FileUtil.exists(logFileDir) : "FileUtil.exists(" + logFileDir + ")"; - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info("File exist already, short circuit return. " + successFilePath); - break; - } - try { - String parStr = "dt='" + dtPartitionStr + "'"; - if (usingHourly) { - parStr += ", hr='" + hrPartitionStr + "'"; - } - String hivePrefix = null; - try { - hivePrefix = mConfig.getHivePrefix(); - } catch (RuntimeException ex) { - LOG.warn("HivePrefix is not defined. Skip hive registration"); - } - if (hivePrefix != null) { - mQuboleClient.addPartition(hivePrefix + topic, parStr); + // Walk through each dimension of the partitions array + // First finalize the hourly partitions, and then move on to daily partitions + for (int dim = partitions.length; dim > 0; dim--) { + String[] uptoPartitions = Arrays.copyOf(partitions, dim); + LOG.info("Finalize up to (but not include) {} for dim: {}", uptoPartitions, dim); + + String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); + Stack toBeFinalized = new Stack(); + // Walk backwards to collect all partitions which are previous to the upTo partition + // Do not include the upTo partition + // Stop at the first partition which already have the SUCCESS file + for (int i = 0; i < mLookbackPeriods; i++) { + LOG.info("Looking for partition: " + Arrays.toString(previous)); + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + if (FileUtil.exists(logFileDir)) { + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info( + "SuccessFile exist already, short circuit return. " + successFilePath); + break; + } + LOG.info("Folder {} exists and ready to be finalized.", logFileDir); + toBeFinalized.push(previous); + } else { + LOG.info("Folder {} doesn't exist, skip", logFileDir); } - } catch (Exception e) { - LOG.error("failed to finalize topic " + topic - + " partition dt=" + dtPartitionStr + " hr=" + hrPartitionStr, - e); - continue; + previous = mMessageParser.getPreviousPartitions(previous); } - LOG.info("touching file {}", successFilePath); - FileUtil.touch(successFilePath); - - // We need to mark the successFile for the dt folder as well - if (usingHourly) { - Calendar yesterday = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - yesterday.setTimeInMillis(partition.getTimeInMillis()); - yesterday.add(Calendar.DAY_OF_MONTH, -1); - finishedDates.add(yesterday); + LOG.info("To be finalized partitions: {}", toBeFinalized); + if (toBeFinalized.isEmpty()) { + LOG.warn("There is no partitions to be finalized for dim: {}", dim); + continue; } - } - // Reverse order to enable short circuit return - finishedDates = finishedDates.descendingSet(); - for (Calendar partition : finishedDates) { - String dtPartitionStr = dtFormat.format(partition.getTime()); - String[] partitions = new String[]{"dt=" + dtPartitionStr}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info("File exist already, short circuit return. " + successFilePath); - break; - } - LOG.info("touching file " + successFilePath); - FileUtil.touch(successFilePath); - } - } + // Now walk forward the collected partitions to do the finalization + // Note we are deliberately walking backwards and then forwards to make sure we don't + // end up in a situation that a later date partition is finalized and then the system + // crashes (which creates unfinalized partition folders in between) + while (!toBeFinalized.isEmpty()) { + String[] current = toBeFinalized.pop(); + LOG.info("Finalizing partition: " + Arrays.toString(current)); + // We only perform hive registration on the last dimension of the partition array + // i.e. only do hive registration for the hourly folder, but not for the daily + if (dim == partitions.length) { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < current.length; i++) { + String par = current[i]; + // We expect the partition array in the form of key=value if + // they need to go through hive registration + String[] parts = par.split("="); + assert parts.length == 2 : "wrong partition format: " + par; + if (i > 0) { + sb.append(","); + } + sb.append(parts[0]); + sb.append("='"); + sb.append(parts[1]); + sb.append("'"); + } + LOG.info("Hive partition string: " + sb); + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); + } + } catch (Exception e) { + LOG.error("failed to finalize topic " + topic, e); + continue; + } + } - /** - * Get finalized timestamp for a given topic partition. Finalized timestamp is the current time - * if the last offset for that topic partition has been committed earlier than an hour ago. - * Otherwise, finalized timestamp is the committed timestamp. - * - * @param topicPartition The topic partition for which we want to compute the finalized - * timestamp. - * @return The finalized timestamp for the topic partition. - * @throws Exception - */ - private long getFinalizedTimestampMillis(TopicPartition topicPartition) throws Exception { - long lastTimestamp = getLastTimestampMillis(topicPartition); - long committedTimestamp = getCommittedTimestampMillis(topicPartition); - long now = System.currentTimeMillis(); - if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { - return now; - } - return committedTimestamp; - } + // Generate the SUCCESS file at the end + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + String successFilePath = logFileDir + "/_SUCCESS"; - private long getFinalizedTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getFinalizedTimestampMillis(topicPartition); - LOG.info("finalized timestamp for topic {} partition {} is {}", topic, partition, timestamp); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; - } + LOG.info("touching file {}", successFilePath); + FileUtil.touch(successFilePath); } } - if (minTimestamp == Long.MAX_VALUE) { - return -1; - } - return minTimestamp; } public void finalizePartitions() throws Exception { @@ -281,17 +186,10 @@ public void finalizePartitions() throws Exception { LOG.info("skipping topic {}", topic); } else { LOG.info("finalizing topic {}", topic); - long finalizedTimestampMillis = getFinalizedTimestampMillis(topic); - LOG.info("finalized timestamp for topic {} is {}", topic , finalizedTimestampMillis); - if (finalizedTimestampMillis != -1) { - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTimeInMillis(finalizedTimestampMillis); - if (!usingHourly) { - calendar.add(Calendar.DAY_OF_MONTH, -1); - } - // Introduce a lag of one hour. - calendar.add(Calendar.HOUR, -1); - finalizePartitionsUpTo(topic, calendar); + String[] partitions = getFinalizedUptoPartitions(topic); + LOG.info("finalized timestamp for topic {} is {}", topic , partitions); + if (partitions != null) { + finalizePartitionsUpTo(topic, partitions); } } } diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java new file mode 100644 index 000000000..d7a4f78e9 --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/Partitioner.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.secor.parser; + +import com.pinterest.secor.message.Message; + +import java.util.List; + +/** + * The Partitioner knows when to finalize a file folder partition. + * + * A file folder partition (e.g. dt=2015-07-07) can be finalized when all + * messages in that date arrived. The caller (PartitionFinalizer) will do the + * finalization work (e.g. generate _SUCCESS file, perform hive registration) + * + * The partitioner provide the method to calculate the range of file + * folder partitions to be finalized and provide the method to iterate through + * the range. + * + * The caller will first provide a list of last-consumed messages for a given + * kafka topic and call #getFinalizedUptoPartitions to get the finalized-up-to + * partition and then walk backwards by calling #getPreviousPartitions to + * collect all the previous partitions which are ready to be finalized. + * + * Note that finalize-up-to partition itself is not inclusive in the range of + * partitions to be finalized. + * + * The caller might repeat this loop multiple times when the filesystem partition + * is multi-dimentional (e.g. [dt=2015-07-07,hr=05]). it will loop once for the + * hourly folder finalization and another time for the daily folder. + * + * Note that although we use daily/hourly partition illustrate the use of + * partitioner, it is be no means the partitioner can only work with timestamp + * based partitioning, it should also be able to work with offset based + * partitioning as long as we establish an iterating order within those + * partitions. + * + * @author Henry Cai (hcai@pinterest.com) + */ +public interface Partitioner { + /** + * Calculates the partition to finalize-up-to from a list of last-consumed + * messages and a list of last-enqueued messages. + * + * For each kafka topic/partition for a given topic, the caller will provide + * two messages: + * * lastMessage: the last message at the tail of the kafka queue + * * committedMessage: the message secor consumed and committed + * And then iterate over all the kafka topic partitions for the given topic, + * the caller will gather the above two messages into two lists. + * + * The Partitioner will compare the messages from all kafka partitions to + * see which one is the earliest to finalize up to. The partitioner will + * normally use the timestamp from the committedMessage to decide + * the finalize time. But for some slow topics where there is no new + * messages coming for a while (i.e. lastMessage == committedMessage), + * the partitioner can use the current time as the finalize time. + * + * Note that the up-to partition itself is not inclusive in the range to be + * finalized. For example, when the last message is in 2015-07-07, + * 7/7 itself is not complete yet. + * + * Note also that the partitioner might want to adjust down the finalize + * time to allow a safety lag for late arrival messages. e.g. adding one + * extra hour lag + * + * @param lastMessages the last message at the tail of the queue + * @param committedMessages the message secor consumed and committed + * + * @return a String array to represent a file folder partition to finalize up to + */ + String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception; + + /** + * Get the previous partition out of the incoming partition. + * E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"] + * + * @param partition + * @return + */ + String[] getPreviousPartitions(String[] partition) throws Exception; +} diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index ff45295a1..a13a252db 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -19,23 +19,36 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; -public abstract class TimestampedMessageParser extends MessageParser { +public abstract class TimestampedMessageParser extends MessageParser implements Partitioner { + + private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class); + + private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd"); + private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH"); + private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH"); + + static { + mDtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mDtHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + } - private final SimpleDateFormat dtFormatter; - private final SimpleDateFormat hrFormatter; - private final boolean usingHourly; + private final boolean mUsingHourly; + private final long mLagInSeconds; public TimestampedMessageParser(SecorConfig config) { super(config); - dtFormatter = new SimpleDateFormat("yyyy-MM-dd"); - dtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); - hrFormatter = new SimpleDateFormat("HH"); - hrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); - usingHourly = config.getMessageTimestampUsingHour(); + mUsingHourly = config.getMessageTimestampUsingHour(); + mLagInSeconds = config.getFinalizerLagSecond(); + LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds); } public abstract long extractTimestampMillis(final Message message) throws Exception; @@ -54,17 +67,87 @@ protected static long toMillis(final long timestamp) { return timestampMillis; } + protected String[] generatePartitions(long timestampMillis, boolean usingHourly) + throws Exception { + Date date = new Date(timestampMillis); + String dt = "dt=" + mDtFormatter.format(date); + String hr = "hr=" + mHrFormatter.format(date); + if (usingHourly) { + return new String[]{dt, hr}; + } else { + return new String[]{dt}; + } + } + + protected long parsePartitions(String[] partitions) throws Exception { + String dtValue = partitions[0].split("=")[1]; + String hrValue = partitions.length > 1 ? partitions[1].split("=")[1] : "00"; + String value = dtValue + "-" + hrValue; + Date date = mDtHrFormatter.parse(value); + return date.getTime(); + } + @Override public String[] extractPartitions(Message message) throws Exception { // Date constructor takes milliseconds since epoch. long timestampMillis = extractTimestampMillis(message); - Date date = new Date(timestampMillis); - String dt = "dt=" + dtFormatter.format(date); - String hr = "hr=" + hrFormatter.format(date); - if (usingHourly) { - return new String[]{dt, hr}; + return generatePartitions(timestampMillis, mUsingHourly); + } + + private long getFinalizedTimestampMillis(Message lastMessage, + Message committedMessage) throws Exception { + long lastTimestamp = extractTimestampMillis(lastMessage); + long committedTimestamp = extractTimestampMillis(committedMessage); + long now = System.currentTimeMillis(); + if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { + LOG.info("No new message coming, use the current time: " + now); + return now; + } + return committedTimestamp; + } + + @Override + public String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception { + if (lastMessages == null || committedMessages == null) { + LOG.error("Either: {} and {} is null", lastMessages, + committedMessages); + return null; + } + assert lastMessages.size() == committedMessages.size(); + + long minMillis = Long.MAX_VALUE; + for (int i = 0; i < lastMessages.size(); i++) { + long millis = getFinalizedTimestampMillis(lastMessages.get(i), + committedMessages.get(i)); + if (millis < minMillis) { + minMillis = millis; + } + } + if (minMillis == Long.MAX_VALUE) { + LOG.error("No valid timestamps among messages: {} and {}", lastMessages, + committedMessages); + return null; + } + + // add the safety lag for late-arrival messages + long lag = mLagInSeconds * 1000L; + LOG.info("Originally: {}, adjust down {}", minMillis, lag); + return generatePartitions(minMillis - lag, mUsingHourly); + } + + @Override + public String[] getPreviousPartitions(String[] partitions) throws Exception { + long millis = parsePartitions(partitions); + long delta; + if (partitions.length == 1) { + delta = 3600L * 24 * 1000L; + } else if (partitions.length == 2) { + delta = 3600L * 1000L; } else { - return new String[]{dt}; + throw new RuntimeException("Unsupported partitions: " + partitions.length); } + return generatePartitions(millis - delta, partitions.length == 2); } + } diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 0e032d96c..abe6c37b5 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -46,15 +46,11 @@ HADOOP_NATIVE_LIB_PATH=lib ADDITIONAL_OPTS= # various reader writer options to be used for testing +# note associate array needs bash v4 support # -# However older bash (ver <= 3) does not support associative array, falls back -# to use two arrays -# declare -A READER_WRITERS -# -declare -a READER_WRITER_KEYS -READER_WRITER_KEYS=(json binary) -declare -a READER_WRITERS -READER_WRITERS=(com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory) +declare -A READER_WRITERS +READER_WRITERS[json]=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory +READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory # The minimum wait time is one minute plus delta. Secor is configured to upload files older than # one minute and we need to make sure that everything ends up on s3 before starting verification. @@ -80,9 +76,11 @@ check_for_native_libs() { recreate_dirs() { run_command "rm -r -f ${PARENT_DIR}" if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR}" else run_command "s3cmd del --recursive ${S3_LOGS_DIR}" + run_command "s3cmd ls -r ${S3_LOGS_DIR}" fi # create logs directory if [ ! -d ${LOGS_DIR} ]; then @@ -94,7 +92,7 @@ start_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then if command -v fakes3 > /dev/null 2>&1; then run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &" - sleep 2 + sleep 10 run_command "s3cmd -c ${CONF_DIR}/test.s3cfg mb s3://${BUCKET}" else echo "Couldn't find FakeS3 binary, please install it using `gem install fakes3`" @@ -104,7 +102,7 @@ start_s3() { stop_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true" + run_command "pkill -f 'fakes3' || true" run_command "rm -r -f /tmp/fakes3" fi } @@ -145,7 +143,7 @@ stop_secor() { run_finalizer() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ - -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp secor-0.1-SNAPSHOT.jar:lib/* \ + -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp $CLASSPATH \ com.pinterest.secor.main.PartitionFinalizerMain > ${LOGS_DIR}/finalizer.log 2>&1 " EXIT_CODE=$? @@ -167,7 +165,7 @@ create_topic() { # $2 timeshift in seconds post_messages() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ - -Dconfig=secor.test.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* \ + -Dconfig=secor.test.backup.properties -cp ${CLASSPATH} \ com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} -timeshift $2 > \ ${LOGS_DIR}/test_log_message_producer.log 2>&1" } @@ -194,15 +192,12 @@ verify() { echo -e "\e[1;41;97mVerification FAILED\e[0m" echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details" tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log - stop_all - stop_s3 exit ${VERIFICATION_EXIT_CODE} fi # Verify SUCCESS file if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | gr -ep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" else run_command "s3cmd ls -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" fi @@ -275,7 +270,6 @@ post_and_verify_test() { # Post some messages and run the finalizer, count # of messages and success file # $1: hr or dt, decides whether it's hourly or daily folder finalization -# $2: number of messages post_and_finalizer_verify_test() { echo "********************************************************" date=$(date -u +'%Y-%m-%d %H:%M:%S') @@ -291,41 +285,32 @@ post_and_finalizer_verify_test() { return fi + HOUR_TIMESHIFT=$((3600+3600)) + DAY_TIMESHIFT=$((86400+3600)) + + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + if [ $1 = "hr" ]; then - ADDITIONAL_OPTS="-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]} -Dmessage.timestamp.using.hour=true" - # for hr folder, the finalizer lag is 1 hour - TIMESHIFT=$((3600+1200)) - if [ $2 -ne 0 ]; then - # should be 2 success files, one for hr folder, - # one for yesterday's dt folder - FILES=2 - else - # should be 0 success files - FILES=0 - fi + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dmessage.timestamp.using.hour=true -Dfinalizer.lookback.periods=30" + # should be 2 success files for hr folder, 1 for dt folder + FILES=3 else - ADDITIONAL_OPTS="-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]}" - # for dt folder, the finalizer lag is 1 day + 1 hour - TIMESHIFT=$((86400+3600+1200)) - if [ $2 -ne 0 ]; then - # should be 1 success files - FILES=1 - else - # should be 0 success files - FILES=0 - fi + # should be 1 success files for dt folder + FILES=1 fi echo "Expected success file: $FILES" - echo "running post_and_finalizer_verify_test $1 $2" + echo "running post_and_finalizer_verify_test $1" initialize start_secor sleep 3 - # post some older messages - post_messages $2 ${TIMESHIFT} - # post some newer messages + # post some messages for yesterday + post_messages ${MESSAGES} ${DAY_TIMESHIFT} + # post some messages for last hour + post_messages ${MESSAGES} ${HOUR_TIMESHIFT} + # post some current messages post_messages ${MESSAGES} 0 echo "Waiting ${WAIT_TIME} sec for Secor to upload logs to s3" @@ -334,9 +319,11 @@ post_and_finalizer_verify_test() { echo "start finalizer" run_finalizer - verify $((0+$2+${MESSAGES})) ${FILES} + verify $((${MESSAGES}*3)) ${FILES} stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mpost_and_finalizer_verify_test succeeded\e[0m" } @@ -364,18 +351,23 @@ move_offset_back_test() { echo "running move_offset_back_test" initialize + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dsecor.max.file.age.seconds=30" + start_secor sleep 3 post_messages $((${MESSAGES}/10)) 0 set_offsets_in_zookeeper 2 post_messages $((${MESSAGES}*9/10)) 0 - echo "Waiting ${WAIT_TIME} sec for Secor to upload logs to s3" - sleep ${WAIT_TIME} + echo "Waiting $((${WAIT_TIME}*3)) sec for Secor to upload logs to s3" + sleep $((${WAIT_TIME}*3)) # 4 because we skipped 2 messages per topic partition and there are 2 partitions per topic. verify $((${MESSAGES}-4)) 0 stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mmove_offset_back_test succeeded\e[0m" } @@ -385,6 +377,8 @@ post_and_verify_compressed_test() { echo "running post_and_verify_compressed_test" initialize + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + # add compression options ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dsecor.compression.codec=org.apache.hadoop.io.compress.GzipCodec \ -Djava.library.path=$HADOOP_NATIVE_LIB_PATH" @@ -396,31 +390,26 @@ post_and_verify_compressed_test() { verify ${MESSAGES} 0 stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mpost_and_verify_compressed_test succeeded\e[0m" } check_for_native_libs +stop_s3 start_s3 -# Testing finalizer in partition mode -num=1 -MESSAGE_TYPE=${READER_WRITER_KEYS[${num}]} -echo "********************************************************" -echo "Running hourly finalizer tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" -post_and_finalizer_verify_test hr 0 -post_and_finalizer_verify_test hr 100 - -echo "********************************************************" -echo "Running daily finalizer tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" -post_and_finalizer_verify_test dt 0 -post_and_finalizer_verify_test dt 100 - -for num in 0 ${#READER_WRITERS[@]}-1; do - MESSAGE_TYPE=${READER_WRITER_KEYS[${num}]} - ADDITIONAL_OPTS=-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]} +for key in ${!READER_WRITERS[@]}; do + MESSAGE_TYPE=${key} + ADDITIONAL_OPTS=-Dsecor.file.reader.writer.factory=${READER_WRITERS[${key}]} echo "********************************************************" - echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" + echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${key}]}" post_and_verify_test + if [ ${MESSAGE_TYPE} = "binary" ]; then + # Testing finalizer in partition mode + post_and_finalizer_verify_test hr + post_and_finalizer_verify_test dt + fi start_from_non_zero_offset_test move_offset_back_test if [ ${MESSAGE_TYPE} = "json" ]; then