From bec77cde670419b7281adafe4dbd63364ec9a958 Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Mon, 13 Jul 2015 19:10:19 -0700 Subject: [PATCH 1/3] Adding support for hourly s3 data ingestion from secor: Summary: - Add the capability of partition the data file in S3 using hourly folder: s3://pinlogs/secor_raw_logs/topic1/dt=2015-07-07/hr=05/ This way, the data file will be on S3 much sooner (hourly vs daily), people can check the result on S3 much sooner and this also opens the door to have an faster hourly data pipeline on Hadoop side as well. The hr folder values are from 00-23 - To trigger the hourly partition and finalization, add the following parameter in your secor config: * message.timestamp.using.hour=true And change the upload threshold to less than one hour: * secor.max.file.age.seconds=3000 - Change the hive partition registration code to register partition using both dt and hr column (it does require the HIVE table to be created or altered to have both dt and hr as the partition columns) - The enhancements are done through the following: * Change the TimestampedMessageParser to allow it to extract both the dt and hr from the timestamp field * Change the Finalizer to go through both the hr and dt folder to generate the SUCCESS file and do hive registration - the dt finalizer lag before finalization was old way: 1 day + 1 hour lag - the hr finalizer lag was defined as: 1 hour lag - Added more unit test on message parser on hourly behavior - Added more E2E tests to cover the partitioner and hourly ingestion Test Plan: Added both unit tests and e2e tests, and tested manually for a new topic on S3 --- .../config/secor.dev.hr.partition.properties | 11 ++ src/main/config/secor.dev.properties | 4 +- .../pinterest/secor/common/SecorConfig.java | 12 ++ .../main/TestLogMessageProducerMain.java | 21 +- .../secor/parser/PartitionFinalizer.java | 91 +++++++-- .../parser/TimestampedMessageParser.java | 20 +- .../secor/tools/TestLogMessageProducer.java | 16 +- .../pinterest/secor/uploader/Uploader.java | 2 + src/main/scripts/run_tests.sh | 179 +++++++++++++++--- .../secor/parser/JsonMessageParserTest.java | 20 ++ 10 files changed, 326 insertions(+), 50 deletions(-) create mode 100644 src/main/config/secor.dev.hr.partition.properties diff --git a/src/main/config/secor.dev.hr.partition.properties b/src/main/config/secor.dev.hr.partition.properties new file mode 100644 index 000000000..2b190ae37 --- /dev/null +++ b/src/main/config/secor.dev.hr.partition.properties @@ -0,0 +1,11 @@ +include=secor.dev.properties + +secor.kafka.group=secor_hr_partition +secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser + +secor.s3.path=secor_dev/hr_partition +secor.local.path=/tmp/secor_dev/message_logs/hr_partition + +message.timestamp.using.hour=true + +ostrich.port=9998 diff --git a/src/main/config/secor.dev.properties b/src/main/config/secor.dev.properties index b1ad32099..ec30afa71 100644 --- a/src/main/config/secor.dev.properties +++ b/src/main/config/secor.dev.properties @@ -19,6 +19,6 @@ zookeeper.quorum=localhost:2181 # Upload policies. # 10K secor.max.file.size.bytes=10000 -# 1 minute -secor.max.file.age.seconds=60 +# 10 seconds +secor.max.file.age.seconds=10 diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 2e03856f2..d3fe22147 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -211,6 +211,10 @@ public String getMessageTimestampInputPattern() { return getString("message.timestamp.input.pattern"); } + public boolean getMessageTimestampUsingHour() { + return getBoolean("message.timestamp.using.hour", false); + } + public String getHivePrefix() { return getString("secor.hive.prefix"); } @@ -252,6 +256,14 @@ private int getInt(String name) { return mProperties.getInt(name); } + private boolean getBoolean(String name, boolean required) { + if (required) { + checkProperty(name); + return mProperties.getBoolean(name); + } + return mProperties.getBoolean(name, false); + } + private long getLong(String name) { return mProperties.getLong(name); } diff --git a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java index 6e768c3df..25c3a86ee 100644 --- a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java +++ b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java @@ -63,6 +63,19 @@ private static CommandLine parseArgs(String[] args) throws ParseException { .withArgName("") .withType(String.class) .create("type")); + options.addOption(OptionBuilder.withLongOpt("broker") + .withDescription("broker string, e.g. localhost:9092") + .hasArg() + .withArgName("") + .withType(String.class) + .create("broker")); + options.addOption(OptionBuilder.withLongOpt("timeshift") + .withDescription("message timestamp adjustment in seconds, it will be deducted" + + " from current time") + .hasArg() + .withArgName("") + .withType(Number.class) + .create("timeshift")); CommandLineParser parser = new GnuParser(); return parser.parse(options, args); @@ -74,9 +87,13 @@ public static void main(String[] args) { String topic = commandLine.getOptionValue("topic"); int messages = ((Number) commandLine.getParsedOptionValue("messages")).intValue(); int producers = ((Number) commandLine.getParsedOptionValue("producers")).intValue(); + String broker = commandLine.getOptionValue("broker"); String type = commandLine.getOptionValue("type"); + Number timeshiftNumber = ((Number)commandLine.getParsedOptionValue("timeshift")); + int timeshift = timeshiftNumber == null ? 0 : timeshiftNumber.intValue(); for (int i = 0; i < producers; ++i) { - TestLogMessageProducer producer = new TestLogMessageProducer(topic, messages, type); + TestLogMessageProducer producer = new TestLogMessageProducer( + topic, messages, type, broker, timeshift); producer.start(); } } catch (Throwable t) { @@ -84,4 +101,4 @@ public static void main(String[] args) { System.exit(1); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index b826db3ae..fd85bbf3b 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -47,6 +47,7 @@ public class PartitionFinalizer { private KafkaClient mKafkaClient; private QuboleClient mQuboleClient; private String mFileExtension; + private final boolean usingHourly; public PartitionFinalizer(SecorConfig config) throws Exception { mConfig = config; @@ -61,6 +62,7 @@ public PartitionFinalizer(SecorConfig config) throws Exception { } else { mFileExtension = ""; } + usingHourly = config.getMessageTimestampUsingHour(); } private long getLastTimestampMillis(TopicPartition topicPartition) throws Exception { @@ -120,21 +122,35 @@ private long getCommittedTimestampMillis(String topic) throws Exception { private NavigableSet getPartitions(String topic) throws IOException, ParseException { final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - String[] partitions = {"dt="}; + String[] partitions = usingHourly ? new String[]{"dt=", "hr="} : new String[]{"dt="}; LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, mConfig.getGeneration(), 0, 0, mFileExtension); String parentDir = logFilePath.getLogFileParentDir(); String[] partitionDirs = FileUtil.list(parentDir); - Pattern pattern = Pattern.compile(".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)$"); + if (usingHourly) { + List dirs = new ArrayList(); + for (String partionDir : partitionDirs) { + dirs.addAll(Arrays.asList(FileUtil.list(partionDir))); + } + partitionDirs = dirs.toArray(new String[dirs.size()]); + } + String patternStr = ".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)"; + if (usingHourly) { + patternStr += "/hr=(\\d\\d)"; + } + patternStr += "$"; + LOG.info("patternStr: " + patternStr); + Pattern pattern = Pattern.compile(patternStr); TreeSet result = new TreeSet(); for (String partitionDir : partitionDirs) { Matcher matcher = pattern.matcher(partitionDir); if (matcher.find()) { String date = matcher.group(1); - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + String hour = usingHourly ? matcher.group(2) : "00"; + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH"); format.setTimeZone(TimeZone.getTimeZone("UTC")); Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTime(format.parse(date)); + calendar.setTime(format.parse(date + "-" + hour)); result.add(calendar); } } @@ -146,27 +162,74 @@ private void finalizePartitionsUpTo(String topic, Calendar calendar) throws IOEx NavigableSet partitionDates = getPartitions(topic).headSet(calendar, true).descendingSet(); final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); + SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd"); + SimpleDateFormat hrFormat = new SimpleDateFormat("HH"); + dtFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + hrFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + NavigableSet finishedDates = new TreeSet(); for (Calendar partition : partitionDates) { - String partitionStr = format.format(partition.getTime()); - String[] partitions = {"dt=" + partitionStr}; + String dtPartitionStr = dtFormat.format(partition.getTime()); + String hrPartitionStr = hrFormat.format(partition.getTime()); + String[] partitions = usingHourly + ? new String[]{"dt=" + dtPartitionStr, "hr=" + hrPartitionStr} + : new String[]{"dt=" + dtPartitionStr}; LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, mConfig.getGeneration(), 0, 0, mFileExtension); String logFileDir = logFilePath.getLogFileDir(); assert FileUtil.exists(logFileDir) : "FileUtil.exists(" + logFileDir + ")"; String successFilePath = logFileDir + "/_SUCCESS"; if (FileUtil.exists(successFilePath)) { - return; + LOG.info("File exist already, short circuit return. " + successFilePath); + break; } try { - mQuboleClient.addPartition(mConfig.getHivePrefix() + topic, "dt='" + partitionStr + "'"); + String parStr = "dt='" + dtPartitionStr + "'"; + if (usingHourly) { + parStr += ", hr='" + hrPartitionStr + "'"; + } + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, parStr); + } } catch (Exception e) { - LOG.error("failed to finalize topic {} partition dt={}", topic , partitionStr, e); + LOG.error("failed to finalize topic " + topic + + " partition dt=" + dtPartitionStr + " hr=" + hrPartitionStr, + e); continue; } LOG.info("touching file {}", successFilePath); FileUtil.touch(successFilePath); + + + // We need to mark the successFile for the dt folder as well + if (usingHourly) { + Calendar yesterday = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + yesterday.setTimeInMillis(partition.getTimeInMillis()); + yesterday.add(Calendar.DAY_OF_MONTH, -1); + finishedDates.add(yesterday); + } + } + + // Reverse order to enable short circuit return + finishedDates = finishedDates.descendingSet(); + for (Calendar partition : finishedDates) { + String dtPartitionStr = dtFormat.format(partition.getTime()); + String[] partitions = new String[]{"dt=" + dtPartitionStr}; + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info("File exist already, short circuit return. " + successFilePath); + break; + } + LOG.info("touching file " + successFilePath); + FileUtil.touch(successFilePath); } } @@ -223,9 +286,11 @@ public void finalizePartitions() throws Exception { if (finalizedTimestampMillis != -1) { Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); calendar.setTimeInMillis(finalizedTimestampMillis); - // Introduce a lag of one day and one hour. + if (!usingHourly) { + calendar.add(Calendar.DAY_OF_MONTH, -1); + } + // Introduce a lag of one hour. calendar.add(Calendar.HOUR, -1); - calendar.add(Calendar.DAY_OF_MONTH, -1); finalizePartitionsUpTo(topic, calendar); } } diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index 74e4b0a02..ff45295a1 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -25,12 +25,17 @@ public abstract class TimestampedMessageParser extends MessageParser { - private SimpleDateFormat mFormatter; + private final SimpleDateFormat dtFormatter; + private final SimpleDateFormat hrFormatter; + private final boolean usingHourly; public TimestampedMessageParser(SecorConfig config) { super(config); - mFormatter = new SimpleDateFormat("yyyy-MM-dd"); - mFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + dtFormatter = new SimpleDateFormat("yyyy-MM-dd"); + dtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + hrFormatter = new SimpleDateFormat("HH"); + hrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + usingHourly = config.getMessageTimestampUsingHour(); } public abstract long extractTimestampMillis(final Message message) throws Exception; @@ -54,7 +59,12 @@ public String[] extractPartitions(Message message) throws Exception { // Date constructor takes milliseconds since epoch. long timestampMillis = extractTimestampMillis(message); Date date = new Date(timestampMillis); - String result[] = {"dt=" + mFormatter.format(date)}; - return result; + String dt = "dt=" + dtFormatter.format(date); + String hr = "hr=" + hrFormatter.format(date); + if (usingHourly) { + return new String[]{dt, hr}; + } else { + return new String[]{dt}; + } } } diff --git a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java index fee7070cd..a8eb86023 100644 --- a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java +++ b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java @@ -40,16 +40,25 @@ public class TestLogMessageProducer extends Thread { private final String mTopic; private final int mNumMessages; private final String mType; + private final String mMetadataBrokerList; + private final int mTimeshift; - public TestLogMessageProducer(String topic, int numMessages, String type) { + public TestLogMessageProducer(String topic, int numMessages, String type, + String metadataBrokerList, int timeshift) { mTopic = topic; mNumMessages = numMessages; mType = type; + mMetadataBrokerList = metadataBrokerList; + mTimeshift = timeshift; } public void run() { Properties properties = new Properties(); - properties.put("metadata.broker.list", "localhost:9092"); + if (mMetadataBrokerList == null || mMetadataBrokerList.isEmpty()) { + properties.put("metadata.broker.list", "localhost:9092"); + } else { + properties.put("metadata.broker.list", mMetadataBrokerList); + } properties.put("partitioner.class", "com.pinterest.secor.tools.RandomPartitioner"); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); @@ -69,7 +78,8 @@ public void run() { TSerializer serializer = new TSerializer(protocol); for (int i = 0; i < mNumMessages; ++i) { - TestMessage testMessage = new TestMessage(System.currentTimeMillis() * 1000000L + i, + long time = (System.currentTimeMillis() - mTimeshift * 1000L) * 1000000L + i; + TestMessage testMessage = new TestMessage(time, "some_value_" + i); if (i % 2 == 0) { testMessage.setEnumField(TestEnum.SOME_VALUE); diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index d98294766..65cb1e617 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -199,6 +199,7 @@ private void trimFiles(TopicPartition topicPartition, long startOffset) throws E private void checkTopicPartition(TopicPartition topicPartition) throws Exception { final long size = mFileRegistry.getSize(topicPartition); final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition); + LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec); if (size >= mConfig.getMaxFileSizeBytes() || modificationAgeSec >= mConfig.getMaxFileAgeSeconds()) { long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition); @@ -206,6 +207,7 @@ private void checkTopicPartition(TopicPartition topicPartition) throws Exception newOffsetCount); long lastSeenOffset = mOffsetTracker.getLastSeenOffset(topicPartition); if (oldOffsetCount == newOffsetCount) { + LOG.debug("Uploading for: " + topicPartition); uploadFiles(topicPartition); } else if (newOffsetCount > lastSeenOffset) { // && oldOffset < newOffset LOG.debug("last seen offset {} is lower than committed offset count {}. Deleting files in topic {} partition {}", diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 6a021ee32..0e032d96c 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -38,7 +38,7 @@ PARENT_DIR=/tmp/secor_dev LOGS_DIR=${PARENT_DIR}/logs BUCKET=${SECOR_BUCKET:-test-bucket} S3_LOGS_DIR=s3://${BUCKET}/secor_dev -MESSAGES=1000 +MESSAGES=100 MESSAGE_TYPE=binary # For the compression tests to work, set this to the path of the Hadoop native libs. HADOOP_NATIVE_LIB_PATH=lib @@ -46,13 +46,19 @@ HADOOP_NATIVE_LIB_PATH=lib ADDITIONAL_OPTS= # various reader writer options to be used for testing -declare -A READER_WRITERS -READER_WRITERS[json]=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory -READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory +# +# However older bash (ver <= 3) does not support associative array, falls back +# to use two arrays +# declare -A READER_WRITERS +# +declare -a READER_WRITER_KEYS +READER_WRITER_KEYS=(json binary) +declare -a READER_WRITERS +READER_WRITERS=(com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory) # The minimum wait time is one minute plus delta. Secor is configured to upload files older than # one minute and we need to make sure that everything ends up on s3 before starting verification. -WAIT_TIME=${SECOR_WAIT_TIME:-120} +WAIT_TIME=${SECOR_WAIT_TIME:-30} BASE_DIR=$(dirname $0) CONF_DIR=${BASE_DIR}/.. @@ -73,7 +79,7 @@ check_for_native_libs() { recreate_dirs() { run_command "rm -r -f ${PARENT_DIR}" - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" else run_command "s3cmd del --recursive ${S3_LOGS_DIR}" @@ -85,7 +91,7 @@ recreate_dirs() { } start_s3() { - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then if command -v fakes3 > /dev/null 2>&1; then run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &" sleep 2 @@ -97,7 +103,7 @@ start_s3() { } stop_s3() { - if [ -n ${SECOR_LOCAL_S3} ]; then + if [ -n "${SECOR_LOCAL_S3}" ]; then run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true" run_command "rm -r -f /tmp/fakes3" fi @@ -137,23 +143,46 @@ stop_secor() { run_command "pkill -f 'com.pinterest.secor.main.ConsumerMain' || true" } +run_finalizer() { + run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ + -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp secor-0.1-SNAPSHOT.jar:lib/* \ + com.pinterest.secor.main.PartitionFinalizerMain > ${LOGS_DIR}/finalizer.log 2>&1 " + + EXIT_CODE=$? + if [ ${EXIT_CODE} -ne 0 ]; then + echo -e "\e[1;41;97mFinalizer FAILED\e[0m" + echo "See log ${LOGS_DIR}/finalizer.log for more details" + exit ${EXIT_CODE} + fi +} + create_topic() { run_command "${BASE_DIR}/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper \ localhost:2181 --replication-factor 1 --partitions 2 --topic test > \ ${LOGS_DIR}/create_topic.log 2>&1" } +# post messages +# $1 number of messages +# $2 timeshift in seconds post_messages() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ - -Dconfig=secor.test.backup.properties -cp ${CLASSPATH} \ - com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} > \ + -Dconfig=secor.test.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* \ + com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} -timeshift $2 > \ ${LOGS_DIR}/test_log_message_producer.log 2>&1" } +# verify the messages +# $1: number of messages +# $2: number of _SUCCESS files verify() { + echo "Verifying $1 $2" + RUNMODE_0="backup" if [ "${MESSAGE_TYPE}" = "binary" ]; then RUNMODE_1="partition" + else + RUNMODE_1="backup" fi for RUNMODE in ${RUNMODE_0} ${RUNMODE_1}; do run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ @@ -169,6 +198,21 @@ verify() { stop_s3 exit ${VERIFICATION_EXIT_CODE} fi + + # Verify SUCCESS file + if [ -n "${SECOR_LOCAL_S3}" ]; then + run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | gr +ep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + else + run_command "s3cmd ls -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + fi + count=$( Date: Thu, 16 Jul 2015 16:01:53 -0700 Subject: [PATCH 2/3] Update based on Pawel's feedback. Instead of hard-coding daily/hourly concept in the PartitionFinalizer. Move the concept of partition boundary and partition ordering into a new interface Partitioner. PartitionFinalizer will work with the Partitioner to the last partition to finalize up to and collect all the partitions previous to that partition from partitioner. --- src/main/config/secor.prod.properties | 1 + .../pinterest/secor/common/LogFilePath.java | 14 +- .../pinterest/secor/common/SecorConfig.java | 20 +- .../secor/parser/PartitionFinalizer.java | 326 ++++++------------ .../pinterest/secor/parser/Partitioner.java | 97 ++++++ .../parser/TimestampedMessageParser.java | 113 +++++- src/main/scripts/run_tests.sh | 115 +++--- 7 files changed, 381 insertions(+), 305 deletions(-) create mode 100644 src/main/java/com/pinterest/secor/parser/Partitioner.java diff --git a/src/main/config/secor.prod.properties b/src/main/config/secor.prod.properties index 0801a5c99..56047cb57 100644 --- a/src/main/config/secor.prod.properties +++ b/src/main/config/secor.prod.properties @@ -37,5 +37,6 @@ zookeeper.quorum= # 200MB secor.max.file.size.bytes=200000000 # 1 hour +# for hourly ingestion/finalization, set this property to smaller value, e.g. 1800 secor.max.file.age.seconds=3600 diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 367dc03f4..753288c0d 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -42,13 +42,13 @@ * @author Pawel Garbacki (pawel@pinterest.com) */ public class LogFilePath { - private String mPrefix; - private String mTopic; - private String[] mPartitions; - private int mGeneration; - private int mKafkaPartition; - private long mOffset; - private String mExtension; + private final String mPrefix; + private final String mTopic; + private final String[] mPartitions; + private final int mGeneration; + private final int mKafkaPartition; + private final long mOffset; + private final String mExtension; public LogFilePath(String prefix, int generation, long lastCommittedOffset, ParsedMessage message, String extension) { diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index d3fe22147..7d66ddd61 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -215,6 +215,14 @@ public boolean getMessageTimestampUsingHour() { return getBoolean("message.timestamp.using.hour", false); } + public int getFinalizerLagSecond() { + return getInt("finalizer.lag.second", 3600); + } + + public int getFinalizerLookbackPeriods() { + return getInt("finalizer.lookback.periods", 10); + } + public String getHivePrefix() { return getString("secor.hive.prefix"); } @@ -256,12 +264,12 @@ private int getInt(String name) { return mProperties.getInt(name); } - private boolean getBoolean(String name, boolean required) { - if (required) { - checkProperty(name); - return mProperties.getBoolean(name); - } - return mProperties.getBoolean(name, false); + private int getInt(String name, int defaultValue) { + return mProperties.getInt(name, defaultValue); + } + + private boolean getBoolean(String name, boolean defaultValue) { + return mProperties.getBoolean(name, defaultValue); } private long getLong(String name) { diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index fd85bbf3b..b9297d589 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -25,12 +25,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Stack; /** * Partition finalizer writes _SUCCESS files to date partitions that very likely won't be receiving @@ -41,13 +39,13 @@ public class PartitionFinalizer { private static final Logger LOG = LoggerFactory.getLogger(PartitionFinalizer.class); - private SecorConfig mConfig; - private ZookeeperConnector mZookeeperConnector; - private TimestampedMessageParser mMessageParser; - private KafkaClient mKafkaClient; - private QuboleClient mQuboleClient; - private String mFileExtension; - private final boolean usingHourly; + private final SecorConfig mConfig; + private final ZookeeperConnector mZookeeperConnector; + private final TimestampedMessageParser mMessageParser; + private final KafkaClient mKafkaClient; + private final QuboleClient mQuboleClient; + private final String mFileExtension; + private final int mLookbackPeriods; public PartitionFinalizer(SecorConfig config) throws Exception { mConfig = config; @@ -62,216 +60,123 @@ public PartitionFinalizer(SecorConfig config) throws Exception { } else { mFileExtension = ""; } - usingHourly = config.getMessageTimestampUsingHour(); + mLookbackPeriods = config.getFinalizerLookbackPeriods(); + LOG.info("Lookback periods: " + mLookbackPeriods); } - private long getLastTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getLastMessage(topicPartition); - if (message == null) { - // This will happen if no messages have been posted to the given topic partition. - LOG.error("No message found for topic {} partition {}" + topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } - - private long getLastTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long max_timestamp = Long.MIN_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getLastTimestampMillis(topicPartition); - if (timestamp > max_timestamp) { - max_timestamp = timestamp; - } - } - if (max_timestamp == Long.MIN_VALUE) { - return -1; - } - return max_timestamp; - } - - private long getCommittedTimestampMillis(TopicPartition topicPartition) throws Exception { - Message message = mKafkaClient.getCommittedMessage(topicPartition); - if (message == null) { - LOG.error("No message found for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return -1; - } - return mMessageParser.extractTimestampMillis(message); - } - - private long getCommittedTimestampMillis(String topic) throws Exception { + private String[] getFinalizedUptoPartitions(String topic) throws Exception { final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; + List lastMessages = new ArrayList(numPartitions); + List committedMessages = new ArrayList(numPartitions); for (int partition = 0; partition < numPartitions; ++partition) { TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getCommittedTimestampMillis(topicPartition); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; - } + Message lastMessage = mKafkaClient.getLastMessage(topicPartition); + Message committedMessage = mKafkaClient.getCommittedMessage(topicPartition); + if (lastMessage == null || committedMessage == null) { + // This will happen if no messages have been posted to the given topic partition. + LOG.error("For topic {} partition {}, lastMessage: {}, commmitted: {}", + topicPartition.getTopic(), topicPartition.getPartition(), + lastMessage, committedMessage); + continue; } + lastMessages.add(lastMessage); + committedMessages.add(committedMessage); } - if (minTimestamp == Long.MAX_VALUE) { - return -1; - } - return minTimestamp; + return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages); } - private NavigableSet getPartitions(String topic) throws IOException, ParseException { + private void finalizePartitionsUpTo(String topic, String[] partitions) throws Exception { final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - String[] partitions = usingHourly ? new String[]{"dt=", "hr="} : new String[]{"dt="}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String parentDir = logFilePath.getLogFileParentDir(); - String[] partitionDirs = FileUtil.list(parentDir); - if (usingHourly) { - List dirs = new ArrayList(); - for (String partionDir : partitionDirs) { - dirs.addAll(Arrays.asList(FileUtil.list(partionDir))); - } - partitionDirs = dirs.toArray(new String[dirs.size()]); - } - String patternStr = ".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)"; - if (usingHourly) { - patternStr += "/hr=(\\d\\d)"; - } - patternStr += "$"; - LOG.info("patternStr: " + patternStr); - Pattern pattern = Pattern.compile(patternStr); - TreeSet result = new TreeSet(); - for (String partitionDir : partitionDirs) { - Matcher matcher = pattern.matcher(partitionDir); - if (matcher.find()) { - String date = matcher.group(1); - String hour = usingHourly ? matcher.group(2) : "00"; - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTime(format.parse(date + "-" + hour)); - result.add(calendar); - } - } - return result; - } - private void finalizePartitionsUpTo(String topic, Calendar calendar) throws IOException, - ParseException, InterruptedException { - NavigableSet partitionDates = - getPartitions(topic).headSet(calendar, true).descendingSet(); - final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd"); - SimpleDateFormat hrFormat = new SimpleDateFormat("HH"); - dtFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - hrFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - NavigableSet finishedDates = new TreeSet(); - for (Calendar partition : partitionDates) { - String dtPartitionStr = dtFormat.format(partition.getTime()); - String hrPartitionStr = hrFormat.format(partition.getTime()); - String[] partitions = usingHourly - ? new String[]{"dt=" + dtPartitionStr, "hr=" + hrPartitionStr} - : new String[]{"dt=" + dtPartitionStr}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - assert FileUtil.exists(logFileDir) : "FileUtil.exists(" + logFileDir + ")"; - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info("File exist already, short circuit return. " + successFilePath); - break; - } - try { - String parStr = "dt='" + dtPartitionStr + "'"; - if (usingHourly) { - parStr += ", hr='" + hrPartitionStr + "'"; - } - String hivePrefix = null; - try { - hivePrefix = mConfig.getHivePrefix(); - } catch (RuntimeException ex) { - LOG.warn("HivePrefix is not defined. Skip hive registration"); - } - if (hivePrefix != null) { - mQuboleClient.addPartition(hivePrefix + topic, parStr); + // Walk through each dimension of the partitions array + // First finalize the hourly partitions, and then move on to daily partitions + for (int dim = partitions.length; dim > 0; dim--) { + String[] uptoPartitions = Arrays.copyOf(partitions, dim); + LOG.info("Finalize up to (but not include) {} for dim: {}", uptoPartitions, dim); + + String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); + Stack toBeFinalized = new Stack(); + // Walk backwards to collect all partitions which are previous to the upTo partition + // Do not include the upTo partition + // Stop at the first partition which already have the SUCCESS file + for (int i = 0; i < mLookbackPeriods; i++) { + LOG.info("Looking for partition: " + Arrays.toString(previous)); + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + if (FileUtil.exists(logFileDir)) { + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info( + "SuccessFile exist already, short circuit return. " + successFilePath); + break; + } + LOG.info("Folder {} exists and ready to be finalized.", logFileDir); + toBeFinalized.push(previous); + } else { + LOG.info("Folder {} doesn't exist, skip", logFileDir); } - } catch (Exception e) { - LOG.error("failed to finalize topic " + topic - + " partition dt=" + dtPartitionStr + " hr=" + hrPartitionStr, - e); - continue; + previous = mMessageParser.getPreviousPartitions(previous); } - LOG.info("touching file {}", successFilePath); - FileUtil.touch(successFilePath); - - // We need to mark the successFile for the dt folder as well - if (usingHourly) { - Calendar yesterday = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - yesterday.setTimeInMillis(partition.getTimeInMillis()); - yesterday.add(Calendar.DAY_OF_MONTH, -1); - finishedDates.add(yesterday); + LOG.info("To be finalized partitions: {}", toBeFinalized); + if (toBeFinalized.isEmpty()) { + LOG.warn("There is no partitions to be finalized for dim: {}", dim); + continue; } - } - // Reverse order to enable short circuit return - finishedDates = finishedDates.descendingSet(); - for (Calendar partition : finishedDates) { - String dtPartitionStr = dtFormat.format(partition.getTime()); - String[] partitions = new String[]{"dt=" + dtPartitionStr}; - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info("File exist already, short circuit return. " + successFilePath); - break; - } - LOG.info("touching file " + successFilePath); - FileUtil.touch(successFilePath); - } - } + // Now walk forward the collected partitions to do the finalization + // Note we are deliberately walking backwards and then forwards to make sure we don't + // end up in a situation that a later date partition is finalized and then the system + // crashes (which creates unfinalized partition folders in between) + while (!toBeFinalized.isEmpty()) { + String[] current = toBeFinalized.pop(); + LOG.info("Finalizing partition: " + Arrays.toString(current)); + // We only perform hive registration on the last dimension of the partition array + // i.e. only do hive registration for the hourly folder, but not for the daily + if (dim == partitions.length) { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < current.length; i++) { + String par = current[i]; + // We expect the partition array in the form of key=value if + // they need to go through hive registration + String[] parts = par.split("="); + assert parts.length == 2 : "wrong partition format: " + par; + if (i > 0) { + sb.append(","); + } + sb.append(parts[0]); + sb.append("='"); + sb.append(parts[1]); + sb.append("'"); + } + LOG.info("Hive partition string: " + sb); + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); + } + } catch (Exception e) { + LOG.error("failed to finalize topic " + topic, e); + continue; + } + } - /** - * Get finalized timestamp for a given topic partition. Finalized timestamp is the current time - * if the last offset for that topic partition has been committed earlier than an hour ago. - * Otherwise, finalized timestamp is the committed timestamp. - * - * @param topicPartition The topic partition for which we want to compute the finalized - * timestamp. - * @return The finalized timestamp for the topic partition. - * @throws Exception - */ - private long getFinalizedTimestampMillis(TopicPartition topicPartition) throws Exception { - long lastTimestamp = getLastTimestampMillis(topicPartition); - long committedTimestamp = getCommittedTimestampMillis(topicPartition); - long now = System.currentTimeMillis(); - if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { - return now; - } - return committedTimestamp; - } + // Generate the SUCCESS file at the end + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + String successFilePath = logFileDir + "/_SUCCESS"; - private long getFinalizedTimestampMillis(String topic) throws Exception { - final int numPartitions = mKafkaClient.getNumPartitions(topic); - long minTimestamp = Long.MAX_VALUE; - for (int partition = 0; partition < numPartitions; ++partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - long timestamp = getFinalizedTimestampMillis(topicPartition); - LOG.info("finalized timestamp for topic {} partition {} is {}", topic, partition, timestamp); - if (timestamp == -1) { - return -1; - } else { - if (timestamp < minTimestamp) { - minTimestamp = timestamp; - } + LOG.info("touching file {}", successFilePath); + FileUtil.touch(successFilePath); } } - if (minTimestamp == Long.MAX_VALUE) { - return -1; - } - return minTimestamp; } public void finalizePartitions() throws Exception { @@ -281,17 +186,10 @@ public void finalizePartitions() throws Exception { LOG.info("skipping topic {}", topic); } else { LOG.info("finalizing topic {}", topic); - long finalizedTimestampMillis = getFinalizedTimestampMillis(topic); - LOG.info("finalized timestamp for topic {} is {}", topic , finalizedTimestampMillis); - if (finalizedTimestampMillis != -1) { - Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - calendar.setTimeInMillis(finalizedTimestampMillis); - if (!usingHourly) { - calendar.add(Calendar.DAY_OF_MONTH, -1); - } - // Introduce a lag of one hour. - calendar.add(Calendar.HOUR, -1); - finalizePartitionsUpTo(topic, calendar); + String[] partitions = getFinalizedUptoPartitions(topic); + LOG.info("finalized timestamp for topic {} is {}", topic , partitions); + if (partitions != null) { + finalizePartitionsUpTo(topic, partitions); } } } diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java new file mode 100644 index 000000000..d7a4f78e9 --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/Partitioner.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.secor.parser; + +import com.pinterest.secor.message.Message; + +import java.util.List; + +/** + * The Partitioner knows when to finalize a file folder partition. + * + * A file folder partition (e.g. dt=2015-07-07) can be finalized when all + * messages in that date arrived. The caller (PartitionFinalizer) will do the + * finalization work (e.g. generate _SUCCESS file, perform hive registration) + * + * The partitioner provide the method to calculate the range of file + * folder partitions to be finalized and provide the method to iterate through + * the range. + * + * The caller will first provide a list of last-consumed messages for a given + * kafka topic and call #getFinalizedUptoPartitions to get the finalized-up-to + * partition and then walk backwards by calling #getPreviousPartitions to + * collect all the previous partitions which are ready to be finalized. + * + * Note that finalize-up-to partition itself is not inclusive in the range of + * partitions to be finalized. + * + * The caller might repeat this loop multiple times when the filesystem partition + * is multi-dimentional (e.g. [dt=2015-07-07,hr=05]). it will loop once for the + * hourly folder finalization and another time for the daily folder. + * + * Note that although we use daily/hourly partition illustrate the use of + * partitioner, it is be no means the partitioner can only work with timestamp + * based partitioning, it should also be able to work with offset based + * partitioning as long as we establish an iterating order within those + * partitions. + * + * @author Henry Cai (hcai@pinterest.com) + */ +public interface Partitioner { + /** + * Calculates the partition to finalize-up-to from a list of last-consumed + * messages and a list of last-enqueued messages. + * + * For each kafka topic/partition for a given topic, the caller will provide + * two messages: + * * lastMessage: the last message at the tail of the kafka queue + * * committedMessage: the message secor consumed and committed + * And then iterate over all the kafka topic partitions for the given topic, + * the caller will gather the above two messages into two lists. + * + * The Partitioner will compare the messages from all kafka partitions to + * see which one is the earliest to finalize up to. The partitioner will + * normally use the timestamp from the committedMessage to decide + * the finalize time. But for some slow topics where there is no new + * messages coming for a while (i.e. lastMessage == committedMessage), + * the partitioner can use the current time as the finalize time. + * + * Note that the up-to partition itself is not inclusive in the range to be + * finalized. For example, when the last message is in 2015-07-07, + * 7/7 itself is not complete yet. + * + * Note also that the partitioner might want to adjust down the finalize + * time to allow a safety lag for late arrival messages. e.g. adding one + * extra hour lag + * + * @param lastMessages the last message at the tail of the queue + * @param committedMessages the message secor consumed and committed + * + * @return a String array to represent a file folder partition to finalize up to + */ + String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception; + + /** + * Get the previous partition out of the incoming partition. + * E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"] + * + * @param partition + * @return + */ + String[] getPreviousPartitions(String[] partition) throws Exception; +} diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index ff45295a1..a13a252db 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -19,23 +19,36 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; -public abstract class TimestampedMessageParser extends MessageParser { +public abstract class TimestampedMessageParser extends MessageParser implements Partitioner { + + private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class); + + private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd"); + private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH"); + private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH"); + + static { + mDtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + mDtHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + } - private final SimpleDateFormat dtFormatter; - private final SimpleDateFormat hrFormatter; - private final boolean usingHourly; + private final boolean mUsingHourly; + private final long mLagInSeconds; public TimestampedMessageParser(SecorConfig config) { super(config); - dtFormatter = new SimpleDateFormat("yyyy-MM-dd"); - dtFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); - hrFormatter = new SimpleDateFormat("HH"); - hrFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); - usingHourly = config.getMessageTimestampUsingHour(); + mUsingHourly = config.getMessageTimestampUsingHour(); + mLagInSeconds = config.getFinalizerLagSecond(); + LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds); } public abstract long extractTimestampMillis(final Message message) throws Exception; @@ -54,17 +67,87 @@ protected static long toMillis(final long timestamp) { return timestampMillis; } + protected String[] generatePartitions(long timestampMillis, boolean usingHourly) + throws Exception { + Date date = new Date(timestampMillis); + String dt = "dt=" + mDtFormatter.format(date); + String hr = "hr=" + mHrFormatter.format(date); + if (usingHourly) { + return new String[]{dt, hr}; + } else { + return new String[]{dt}; + } + } + + protected long parsePartitions(String[] partitions) throws Exception { + String dtValue = partitions[0].split("=")[1]; + String hrValue = partitions.length > 1 ? partitions[1].split("=")[1] : "00"; + String value = dtValue + "-" + hrValue; + Date date = mDtHrFormatter.parse(value); + return date.getTime(); + } + @Override public String[] extractPartitions(Message message) throws Exception { // Date constructor takes milliseconds since epoch. long timestampMillis = extractTimestampMillis(message); - Date date = new Date(timestampMillis); - String dt = "dt=" + dtFormatter.format(date); - String hr = "hr=" + hrFormatter.format(date); - if (usingHourly) { - return new String[]{dt, hr}; + return generatePartitions(timestampMillis, mUsingHourly); + } + + private long getFinalizedTimestampMillis(Message lastMessage, + Message committedMessage) throws Exception { + long lastTimestamp = extractTimestampMillis(lastMessage); + long committedTimestamp = extractTimestampMillis(committedMessage); + long now = System.currentTimeMillis(); + if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) { + LOG.info("No new message coming, use the current time: " + now); + return now; + } + return committedTimestamp; + } + + @Override + public String[] getFinalizedUptoPartitions(List lastMessages, + List committedMessages) throws Exception { + if (lastMessages == null || committedMessages == null) { + LOG.error("Either: {} and {} is null", lastMessages, + committedMessages); + return null; + } + assert lastMessages.size() == committedMessages.size(); + + long minMillis = Long.MAX_VALUE; + for (int i = 0; i < lastMessages.size(); i++) { + long millis = getFinalizedTimestampMillis(lastMessages.get(i), + committedMessages.get(i)); + if (millis < minMillis) { + minMillis = millis; + } + } + if (minMillis == Long.MAX_VALUE) { + LOG.error("No valid timestamps among messages: {} and {}", lastMessages, + committedMessages); + return null; + } + + // add the safety lag for late-arrival messages + long lag = mLagInSeconds * 1000L; + LOG.info("Originally: {}, adjust down {}", minMillis, lag); + return generatePartitions(minMillis - lag, mUsingHourly); + } + + @Override + public String[] getPreviousPartitions(String[] partitions) throws Exception { + long millis = parsePartitions(partitions); + long delta; + if (partitions.length == 1) { + delta = 3600L * 24 * 1000L; + } else if (partitions.length == 2) { + delta = 3600L * 1000L; } else { - return new String[]{dt}; + throw new RuntimeException("Unsupported partitions: " + partitions.length); } + return generatePartitions(millis - delta, partitions.length == 2); } + } diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 0e032d96c..abe6c37b5 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -46,15 +46,11 @@ HADOOP_NATIVE_LIB_PATH=lib ADDITIONAL_OPTS= # various reader writer options to be used for testing +# note associate array needs bash v4 support # -# However older bash (ver <= 3) does not support associative array, falls back -# to use two arrays -# declare -A READER_WRITERS -# -declare -a READER_WRITER_KEYS -READER_WRITER_KEYS=(json binary) -declare -a READER_WRITERS -READER_WRITERS=(com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory) +declare -A READER_WRITERS +READER_WRITERS[json]=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory +READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory # The minimum wait time is one minute plus delta. Secor is configured to upload files older than # one minute and we need to make sure that everything ends up on s3 before starting verification. @@ -80,9 +76,11 @@ check_for_native_libs() { recreate_dirs() { run_command "rm -r -f ${PARENT_DIR}" if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR} | awk '{ print \$4 }' | xargs -L 1 s3cmd -c ${CONF_DIR}/test.s3cfg del" + run_command "s3cmd -c ${CONF_DIR}/test.s3cfg ls -r ${S3_LOGS_DIR}" else run_command "s3cmd del --recursive ${S3_LOGS_DIR}" + run_command "s3cmd ls -r ${S3_LOGS_DIR}" fi # create logs directory if [ ! -d ${LOGS_DIR} ]; then @@ -94,7 +92,7 @@ start_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then if command -v fakes3 > /dev/null 2>&1; then run_command "fakes3 --root=/tmp/fakes3 --port=5000 --hostname=localhost > /tmp/fakes3.log 2>&1 &" - sleep 2 + sleep 10 run_command "s3cmd -c ${CONF_DIR}/test.s3cfg mb s3://${BUCKET}" else echo "Couldn't find FakeS3 binary, please install it using `gem install fakes3`" @@ -104,7 +102,7 @@ start_s3() { stop_s3() { if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "pkill -9 'fakes3' > /dev/null 2>&1 || true" + run_command "pkill -f 'fakes3' || true" run_command "rm -r -f /tmp/fakes3" fi } @@ -145,7 +143,7 @@ stop_secor() { run_finalizer() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ - -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp secor-0.1-SNAPSHOT.jar:lib/* \ + -Dconfig=secor.test.partition.properties ${ADDITIONAL_OPTS} -cp $CLASSPATH \ com.pinterest.secor.main.PartitionFinalizerMain > ${LOGS_DIR}/finalizer.log 2>&1 " EXIT_CODE=$? @@ -167,7 +165,7 @@ create_topic() { # $2 timeshift in seconds post_messages() { run_command "${JAVA} -server -ea -Dlog4j.configuration=log4j.dev.properties \ - -Dconfig=secor.test.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* \ + -Dconfig=secor.test.backup.properties -cp ${CLASSPATH} \ com.pinterest.secor.main.TestLogMessageProducerMain -t test -m $1 -p 1 -type ${MESSAGE_TYPE} -timeshift $2 > \ ${LOGS_DIR}/test_log_message_producer.log 2>&1" } @@ -194,15 +192,12 @@ verify() { echo -e "\e[1;41;97mVerification FAILED\e[0m" echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details" tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log - stop_all - stop_s3 exit ${VERIFICATION_EXIT_CODE} fi # Verify SUCCESS file if [ -n "${SECOR_LOCAL_S3}" ]; then - run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | gr -ep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" + run_command "s3cmd ls -c ${CONF_DIR}/test.s3cfg -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" else run_command "s3cmd ls -r ${S3_LOGS_DIR} | grep _SUCCESS | wc -l > /tmp/secor_tests_output.txt" fi @@ -275,7 +270,6 @@ post_and_verify_test() { # Post some messages and run the finalizer, count # of messages and success file # $1: hr or dt, decides whether it's hourly or daily folder finalization -# $2: number of messages post_and_finalizer_verify_test() { echo "********************************************************" date=$(date -u +'%Y-%m-%d %H:%M:%S') @@ -291,41 +285,32 @@ post_and_finalizer_verify_test() { return fi + HOUR_TIMESHIFT=$((3600+3600)) + DAY_TIMESHIFT=$((86400+3600)) + + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + if [ $1 = "hr" ]; then - ADDITIONAL_OPTS="-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]} -Dmessage.timestamp.using.hour=true" - # for hr folder, the finalizer lag is 1 hour - TIMESHIFT=$((3600+1200)) - if [ $2 -ne 0 ]; then - # should be 2 success files, one for hr folder, - # one for yesterday's dt folder - FILES=2 - else - # should be 0 success files - FILES=0 - fi + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dmessage.timestamp.using.hour=true -Dfinalizer.lookback.periods=30" + # should be 2 success files for hr folder, 1 for dt folder + FILES=3 else - ADDITIONAL_OPTS="-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]}" - # for dt folder, the finalizer lag is 1 day + 1 hour - TIMESHIFT=$((86400+3600+1200)) - if [ $2 -ne 0 ]; then - # should be 1 success files - FILES=1 - else - # should be 0 success files - FILES=0 - fi + # should be 1 success files for dt folder + FILES=1 fi echo "Expected success file: $FILES" - echo "running post_and_finalizer_verify_test $1 $2" + echo "running post_and_finalizer_verify_test $1" initialize start_secor sleep 3 - # post some older messages - post_messages $2 ${TIMESHIFT} - # post some newer messages + # post some messages for yesterday + post_messages ${MESSAGES} ${DAY_TIMESHIFT} + # post some messages for last hour + post_messages ${MESSAGES} ${HOUR_TIMESHIFT} + # post some current messages post_messages ${MESSAGES} 0 echo "Waiting ${WAIT_TIME} sec for Secor to upload logs to s3" @@ -334,9 +319,11 @@ post_and_finalizer_verify_test() { echo "start finalizer" run_finalizer - verify $((0+$2+${MESSAGES})) ${FILES} + verify $((${MESSAGES}*3)) ${FILES} stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mpost_and_finalizer_verify_test succeeded\e[0m" } @@ -364,18 +351,23 @@ move_offset_back_test() { echo "running move_offset_back_test" initialize + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dsecor.max.file.age.seconds=30" + start_secor sleep 3 post_messages $((${MESSAGES}/10)) 0 set_offsets_in_zookeeper 2 post_messages $((${MESSAGES}*9/10)) 0 - echo "Waiting ${WAIT_TIME} sec for Secor to upload logs to s3" - sleep ${WAIT_TIME} + echo "Waiting $((${WAIT_TIME}*3)) sec for Secor to upload logs to s3" + sleep $((${WAIT_TIME}*3)) # 4 because we skipped 2 messages per topic partition and there are 2 partitions per topic. verify $((${MESSAGES}-4)) 0 stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mmove_offset_back_test succeeded\e[0m" } @@ -385,6 +377,8 @@ post_and_verify_compressed_test() { echo "running post_and_verify_compressed_test" initialize + OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} + # add compression options ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dsecor.compression.codec=org.apache.hadoop.io.compress.GzipCodec \ -Djava.library.path=$HADOOP_NATIVE_LIB_PATH" @@ -396,31 +390,26 @@ post_and_verify_compressed_test() { verify ${MESSAGES} 0 stop_all + ADDITIONAL_OPTS=${OLD_ADDITIONAL_OPTS} + echo -e "\e[1;42;97mpost_and_verify_compressed_test succeeded\e[0m" } check_for_native_libs +stop_s3 start_s3 -# Testing finalizer in partition mode -num=1 -MESSAGE_TYPE=${READER_WRITER_KEYS[${num}]} -echo "********************************************************" -echo "Running hourly finalizer tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" -post_and_finalizer_verify_test hr 0 -post_and_finalizer_verify_test hr 100 - -echo "********************************************************" -echo "Running daily finalizer tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" -post_and_finalizer_verify_test dt 0 -post_and_finalizer_verify_test dt 100 - -for num in 0 ${#READER_WRITERS[@]}-1; do - MESSAGE_TYPE=${READER_WRITER_KEYS[${num}]} - ADDITIONAL_OPTS=-Dsecor.file.reader.writer.factory=${READER_WRITERS[${num}]} +for key in ${!READER_WRITERS[@]}; do + MESSAGE_TYPE=${key} + ADDITIONAL_OPTS=-Dsecor.file.reader.writer.factory=${READER_WRITERS[${key}]} echo "********************************************************" - echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${num}]}" + echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${key}]}" post_and_verify_test + if [ ${MESSAGE_TYPE} = "binary" ]; then + # Testing finalizer in partition mode + post_and_finalizer_verify_test hr + post_and_finalizer_verify_test dt + fi start_from_non_zero_offset_test move_offset_back_test if [ ${MESSAGE_TYPE} = "json" ]; then From 32f19e025bc6cd0189893bfc518844ac9e3b31d2 Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Thu, 16 Jul 2015 17:16:24 -0700 Subject: [PATCH 3/3] Adding support for hourly s3 data ingestion from secor: Summary: - Add the capability of partition the data file in S3 using hourly folder: s3://pinlogs/secor_raw_logs/topic1/dt=2015-07-07/hr=05/ This way, the data file will be on S3 much sooner (hourly vs daily), people can check the result on S3 much sooner and this also opens the door to have an faster hourly data pipeline on Hadoop side as well. The hr folder values are from 00-23 - To trigger the hourly partition and finalization, add the following parameter in your secor config: * # partitioner.granularity.hour=true And change the upload threshold to less than one hour: * secor.max.file.age.seconds=3000 - Change the hive partition registration code to register partition using both dt and hr column (it does require the HIVE table to be created or altered to have both dt and hr as the partition columns) - The enhancements are done through the following: * Introduce a new interface Partitioner which knows what the last partition period to be finalized (generating SUCCESS file) and knows how to find the previous partition periods to be finalized upon. * Change the TimestampedMessageParser to implement Partitioner, allow it to extract both the dt and hr from the timestamp field, and knows how to traverse backwards to find the previous partitions * Change the Finalizer to work with the Partitioner to loop through the list of ready-to-be finalized partitions for both the hr and dt folder to generate the SUCCESS file and do hive registration - Added more unit test on message parser on hourly behavior - Added more E2E tests to cover the partitioner and hourly ingestion Test Plan: Added both unit tests and e2e tests, and tested manually for a new topic on S3 --- src/main/config/secor.common.properties | 16 ++ .../config/secor.dev.hr.partition.properties | 2 +- .../pinterest/secor/common/SecorConfig.java | 18 +- .../secor/parser/PartitionFinalizer.java | 155 +++++++++--------- .../pinterest/secor/parser/Partitioner.java | 7 + .../parser/TimestampedMessageParser.java | 47 ++++-- src/main/scripts/run_tests.sh | 12 +- .../secor/parser/JsonMessageParserTest.java | 101 +++++++++++- 8 files changed, 239 insertions(+), 119 deletions(-) diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index f6c9c2f9b..1c296212d 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -86,6 +86,22 @@ secor.offsets.per.partition=10000000 # How long does it take for secor to forget a topic partition. Applies to stats generation only. secor.topic_partition.forget.seconds=600 +# Setting the partitioner to use hourly partition +# By default, the partitioner will do daily partition, so the data will be +# written into +# s3n://.../topic/dt=2015-07-07/ +# If this parameter is set to true, the data will be written into +# s3n://.../topic/dt=2015-07-07/hr=02 +# The hour folder ranges from 00 to 23 +# partitioner.granularity.hour=true + +# During partition finalization, the finalizer will start from the last +# time partition (e.g. dt=2015-07-17) and traverse backwards for n +# partition periods (e.g. dt=2015-07-16, dt=2015-07-15 ...) +# This parameter controls how many partition periods to traverse back +# The default is 10 +# secor.finalizer.lookback.periods=10 + # If greater than 0, upon starup Secor will clean up directories and files under secor.local.path # that are older than this value. secor.local.log.delete.age.hours=-1 diff --git a/src/main/config/secor.dev.hr.partition.properties b/src/main/config/secor.dev.hr.partition.properties index 2b190ae37..95c7017c2 100644 --- a/src/main/config/secor.dev.hr.partition.properties +++ b/src/main/config/secor.dev.hr.partition.properties @@ -6,6 +6,6 @@ secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser secor.s3.path=secor_dev/hr_partition secor.local.path=/tmp/secor_dev/message_logs/hr_partition -message.timestamp.using.hour=true +partitioner.granularity.hour=true ostrich.port=9998 diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 7d66ddd61..23621750c 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -211,16 +211,8 @@ public String getMessageTimestampInputPattern() { return getString("message.timestamp.input.pattern"); } - public boolean getMessageTimestampUsingHour() { - return getBoolean("message.timestamp.using.hour", false); - } - - public int getFinalizerLagSecond() { - return getInt("finalizer.lag.second", 3600); - } - public int getFinalizerLookbackPeriods() { - return getInt("finalizer.lookback.periods", 10); + return getInt("secor.finalizer.lookback.periods", 10); } public String getHivePrefix() { @@ -247,6 +239,10 @@ public String getZookeeperPath() { return getString("secor.zookeeper.path"); } + public boolean getBoolean(String name, boolean defaultValue) { + return mProperties.getBoolean(name, defaultValue); + } + private void checkProperty(String name) { if (!mProperties.containsKey(name)) { throw new RuntimeException("Failed to find required configuration option '" + @@ -268,10 +264,6 @@ private int getInt(String name, int defaultValue) { return mProperties.getInt(name, defaultValue); } - private boolean getBoolean(String name, boolean defaultValue) { - return mProperties.getBoolean(name, defaultValue); - } - private long getLong(String name) { return mProperties.getLong(name); } diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index b9297d589..58e12fa68 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -85,98 +85,95 @@ private String[] getFinalizedUptoPartitions(String topic) throws Exception { return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages); } - private void finalizePartitionsUpTo(String topic, String[] partitions) throws Exception { + private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception { final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - // Walk through each dimension of the partitions array - // First finalize the hourly partitions, and then move on to daily partitions - for (int dim = partitions.length; dim > 0; dim--) { - String[] uptoPartitions = Arrays.copyOf(partitions, dim); - LOG.info("Finalize up to (but not include) {} for dim: {}", uptoPartitions, dim); + LOG.info("Finalize up to (but not include) {}, dim: {}", + uptoPartitions, uptoPartitions.length); - String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); - Stack toBeFinalized = new Stack(); - // Walk backwards to collect all partitions which are previous to the upTo partition - // Do not include the upTo partition - // Stop at the first partition which already have the SUCCESS file - for (int i = 0; i < mLookbackPeriods; i++) { - LOG.info("Looking for partition: " + Arrays.toString(previous)); - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - if (FileUtil.exists(logFileDir)) { - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info( - "SuccessFile exist already, short circuit return. " + successFilePath); - break; - } - LOG.info("Folder {} exists and ready to be finalized.", logFileDir); - toBeFinalized.push(previous); - } else { - LOG.info("Folder {} doesn't exist, skip", logFileDir); + String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); + Stack toBeFinalized = new Stack(); + // Walk backwards to collect all partitions which are previous to the upTo partition + // Do not include the upTo partition + // Stop at the first partition which already have the SUCCESS file + for (int i = 0; i < mLookbackPeriods; i++) { + LOG.info("Looking for partition: " + Arrays.toString(previous)); + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + if (FileUtil.exists(logFileDir)) { + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info( + "SuccessFile exist already, short circuit return. " + successFilePath); + break; } - previous = mMessageParser.getPreviousPartitions(previous); + LOG.info("Folder {} exists and ready to be finalized.", logFileDir); + toBeFinalized.push(previous); + } else { + LOG.info("Folder {} doesn't exist, skip", logFileDir); } + previous = mMessageParser.getPreviousPartitions(previous); + } - LOG.info("To be finalized partitions: {}", toBeFinalized); - if (toBeFinalized.isEmpty()) { - LOG.warn("There is no partitions to be finalized for dim: {}", dim); - continue; - } + LOG.info("To be finalized partitions: {}", toBeFinalized); + if (toBeFinalized.isEmpty()) { + LOG.warn("There is no partitions to be finalized."); + return; + } - // Now walk forward the collected partitions to do the finalization - // Note we are deliberately walking backwards and then forwards to make sure we don't - // end up in a situation that a later date partition is finalized and then the system - // crashes (which creates unfinalized partition folders in between) - while (!toBeFinalized.isEmpty()) { - String[] current = toBeFinalized.pop(); - LOG.info("Finalizing partition: " + Arrays.toString(current)); - // We only perform hive registration on the last dimension of the partition array - // i.e. only do hive registration for the hourly folder, but not for the daily - if (dim == partitions.length) { - try { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < current.length; i++) { - String par = current[i]; - // We expect the partition array in the form of key=value if - // they need to go through hive registration - String[] parts = par.split("="); - assert parts.length == 2 : "wrong partition format: " + par; - if (i > 0) { - sb.append(","); - } - sb.append(parts[0]); - sb.append("='"); - sb.append(parts[1]); - sb.append("'"); - } - LOG.info("Hive partition string: " + sb); - String hivePrefix = null; - try { - hivePrefix = mConfig.getHivePrefix(); - } catch (RuntimeException ex) { - LOG.warn("HivePrefix is not defined. Skip hive registration"); + // Now walk forward the collected partitions to do the finalization + // Note we are deliberately walking backwards and then forwards to make sure we don't + // end up in a situation that a later date partition is finalized and then the system + // crashes (which creates unfinalized partition folders in between) + while (!toBeFinalized.isEmpty()) { + String[] current = toBeFinalized.pop(); + LOG.info("Finalizing partition: " + Arrays.toString(current)); + // We only perform hive registration on the last dimension of the partition array + // i.e. only do hive registration for the hourly folder, but not for the daily + if (uptoPartitions.length == current.length) { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < current.length; i++) { + String par = current[i]; + // We expect the partition array in the form of key=value if + // they need to go through hive registration + String[] parts = par.split("="); + assert parts.length == 2 : "wrong partition format: " + par; + if (i > 0) { + sb.append(","); } - if (hivePrefix != null) { - mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); - } - } catch (Exception e) { - LOG.error("failed to finalize topic " + topic, e); - continue; + sb.append(parts[0]); + sb.append("='"); + sb.append(parts[1]); + sb.append("'"); + } + LOG.info("Hive partition string: " + sb); + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); } + } catch (Exception e) { + LOG.error("failed to finalize topic " + topic, e); + continue; } + } - // Generate the SUCCESS file at the end - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - String successFilePath = logFileDir + "/_SUCCESS"; + // Generate the SUCCESS file at the end + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + String successFilePath = logFileDir + "/_SUCCESS"; - LOG.info("touching file {}", successFilePath); - FileUtil.touch(successFilePath); - } + LOG.info("touching file {}", successFilePath); + FileUtil.touch(successFilePath); } + } public void finalizePartitions() throws Exception { diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java index d7a4f78e9..6253758e0 100644 --- a/src/main/java/com/pinterest/secor/parser/Partitioner.java +++ b/src/main/java/com/pinterest/secor/parser/Partitioner.java @@ -90,6 +90,13 @@ String[] getFinalizedUptoPartitions(List lastMessages, * Get the previous partition out of the incoming partition. * E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"] * + * Note that the implementation might return the previous sequence in daily/mixed forms, e.g. + * [dt=2015-07-07, hr=01] + * [dt=2015-07-07, hr=00] + * [dt=2015-07-07] <-- dt folder in between + * [dt=2015-07-06, hr=23] + * [dt=2015-07-07, hr=22] + * * @param partition * @return */ diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index a13a252db..c0f330044 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -31,6 +31,9 @@ public abstract class TimestampedMessageParser extends MessageParser implements private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class); + private static final long HOUR_IN_MILLIS = 3600L * 1000L; + private static final long DAY_IN_MILLIS = 3600L * 24 * 1000L; + private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd"); private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH"); private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH"); @@ -42,17 +45,19 @@ public abstract class TimestampedMessageParser extends MessageParser implements } private final boolean mUsingHourly; - private final long mLagInSeconds; public TimestampedMessageParser(SecorConfig config) { super(config); - mUsingHourly = config.getMessageTimestampUsingHour(); - mLagInSeconds = config.getFinalizerLagSecond(); - LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds); + mUsingHourly = usingHourly(config); + LOG.info("UsingHourly: {}", mUsingHourly); } public abstract long extractTimestampMillis(final Message message) throws Exception; + static boolean usingHourly(SecorConfig config) { + return config.getBoolean("partitioner.granularity.hour", false); + } + protected static long toMillis(final long timestamp) { final long nanosecondDivider = (long) Math.pow(10, 9 + 9); final long millisecondDivider = (long) Math.pow(10, 9 + 3); @@ -131,23 +136,37 @@ public String[] getFinalizedUptoPartitions(List lastMessages, } // add the safety lag for late-arrival messages - long lag = mLagInSeconds * 1000L; - LOG.info("Originally: {}, adjust down {}", minMillis, lag); - return generatePartitions(minMillis - lag, mUsingHourly); + minMillis -= 3600L * 1000L; + LOG.info("adjusted millis {}", minMillis); + return generatePartitions(minMillis, mUsingHourly); } @Override public String[] getPreviousPartitions(String[] partitions) throws Exception { long millis = parsePartitions(partitions); - long delta; - if (partitions.length == 1) { - delta = 3600L * 24 * 1000L; - } else if (partitions.length == 2) { - delta = 3600L * 1000L; + boolean usingHourly = mUsingHourly; + if (mUsingHourly && millis % DAY_IN_MILLIS == 0) { + // On the day boundary, if the currrent partition is [dt=07-07, hr=00], the previous + // one is dt=07-06; If the current one is [dt=07-06], the previous one is + // [dt=07-06, hr-23] + // So we would return in the order of: + // dt=07-07, hr=01 + // dt=07-07, hr=00 + // dt=07-06 + // dt=07-06, hr=23 + if (partitions.length == 2 ) { + usingHourly = false; + millis -= DAY_IN_MILLIS; + } else { + usingHourly = true; + millis += DAY_IN_MILLIS; + millis -= HOUR_IN_MILLIS; + } } else { - throw new RuntimeException("Unsupported partitions: " + partitions.length); + long delta = mUsingHourly ? HOUR_IN_MILLIS : DAY_IN_MILLIS; + millis -= delta; } - return generatePartitions(millis - delta, partitions.length == 2); + return generatePartitions(millis, usingHourly); } } diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 2944b4724..d42f7a8dd 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -54,7 +54,7 @@ READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFacto # The minimum wait time is one minute plus delta. Secor is configured to upload files older than # one minute and we need to make sure that everything ends up on s3 before starting verification. -WAIT_TIME=${SECOR_WAIT_TIME:-30} +WAIT_TIME=${SECOR_WAIT_TIME:-120} BASE_DIR=$(dirname $0) CONF_DIR=${BASE_DIR}/.. @@ -192,6 +192,10 @@ verify() { echo -e "\e[1;41;97mVerification FAILED\e[0m" echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details" tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log + echo "See log ${LOGS_DIR}/secor_${RUNMODE}.log for more details" + tail -n 50 ${LOGS_DIR}/secor_${RUNMODE}.log + echo "See log ${LOGS_DIR}/test_log_message_producer.log for more details" + tail -n 50 ${LOGS_DIR}/test_log_message_producer.log exit ${VERIFICATION_EXIT_CODE} fi @@ -288,7 +292,7 @@ post_and_finalizer_verify_test() { OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} if [ $1 = "hr" ]; then - ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dmessage.timestamp.using.hour=true -Dfinalizer.lookback.periods=30" + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dpartitioner.granularity.hour=true -Dsecor.finalizer.lookback.periods=30" # should be 2 success files for hr folder, 1 for dt folder FILES=3 else @@ -357,8 +361,8 @@ move_offset_back_test() { set_offsets_in_zookeeper 2 post_messages $((${MESSAGES}*9/10)) 0 - echo "Waiting $((${WAIT_TIME}*3)) sec for Secor to upload logs to s3" - sleep $((${WAIT_TIME}*3)) + echo "Waiting $((${WAIT_TIME}*2)) sec for Secor to upload logs to s3" + sleep $((${WAIT_TIME}*2)) # 4 because we skipped 2 messages per topic partition and there are 2 partitions per topic. verify $((${MESSAGES}-4)) 0 diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java index f8157f9f4..96c0cc687 100644 --- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java @@ -18,12 +18,17 @@ import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; + +import java.util.Arrays; +import java.util.List; import junit.framework.TestCase; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.ArrayList; + @RunWith(PowerMockRunner.class) public class JsonMessageParserTest extends TestCase { @@ -39,15 +44,15 @@ public void setUp() throws Exception { Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp"); byte messageWithSecondsTimestamp[] = - "{\"timestamp\":\"1405970352\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithSecondsTimestamp = new Message("test", 0, 0, messageWithSecondsTimestamp); byte messageWithMillisTimestamp[] = - "{\"timestamp\":\"1405970352123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithMillisTimestamp = new Message("test", 0, 0, messageWithMillisTimestamp); byte messageWithMillisFloatTimestamp[] = - "{\"timestamp\":\"1405970352123.0\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096123.0\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithMillisFloatTimestamp = new Message("test", 0, 0, messageWithMillisFloatTimestamp); byte messageWithoutTimestamp[] = @@ -59,9 +64,9 @@ public void setUp() throws Exception { public void testExtractTimestampMillis() throws Exception { JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); - assertEquals(1405970352000l, jsonMessageParser.extractTimestampMillis(mMessageWithSecondsTimestamp)); - assertEquals(1405970352123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisTimestamp)); - assertEquals(1405970352123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisFloatTimestamp)); + assertEquals(1405911096000l, jsonMessageParser.extractTimestampMillis(mMessageWithSecondsTimestamp)); + assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisTimestamp)); + assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisFloatTimestamp)); // Return 0 if there's no timestamp, for any reason. @@ -101,11 +106,11 @@ public void testExtractPartitions() throws Exception { @Test public void testExtractHourlyPartitions() throws Exception { - Mockito.when(mConfig.getMessageTimestampUsingHour()).thenReturn(true); + Mockito.when(TimestampedMessageParser.usingHourly(mConfig)).thenReturn(true); JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); String expectedDtPartition = "dt=2014-07-21"; - String expectedHrPartition = "hr=19"; + String expectedHrPartition = "hr=02"; String resultSeconds[] = jsonMessageParser.extractPartitions(mMessageWithSecondsTimestamp); assertEquals(2, resultSeconds.length); @@ -118,4 +123,84 @@ public void testExtractHourlyPartitions() throws Exception { assertEquals(expectedHrPartition, resultMillis[1]); } + @Test + public void testDailyGetFinalizedUptoPartitions() throws Exception { + JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); + + List lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(1, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + + String[] previous = jsonMessageParser.getPreviousPartitions(uptoPartitions); + assertEquals(1, previous.length); + assertEquals("dt=2014-07-20", previous[0]); + } + + @Test + public void testHourlyGetFinalizedUptoPartitions() throws Exception { + Mockito.when(TimestampedMessageParser.usingHourly(mConfig)).thenReturn(true); + JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); + + List lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(2, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + assertEquals("hr=01", uptoPartitions[1]); + + String[][] expectedPartitions = new String[][] { + new String[]{"dt=2014-07-21", "hr=00"}, + new String[]{"dt=2014-07-20"}, // there is day partition for previous day + new String[]{"dt=2014-07-20", "hr=23"}, + new String[]{"dt=2014-07-20", "hr=22"}, + new String[]{"dt=2014-07-20", "hr=21"}, + new String[]{"dt=2014-07-20", "hr=20"}, + new String[]{"dt=2014-07-20", "hr=19"}, + new String[]{"dt=2014-07-20", "hr=18"}, + new String[]{"dt=2014-07-20", "hr=17"}, + new String[]{"dt=2014-07-20", "hr=16"}, + new String[]{"dt=2014-07-20", "hr=15"}, + new String[]{"dt=2014-07-20", "hr=14"}, + new String[]{"dt=2014-07-20", "hr=13"}, + new String[]{"dt=2014-07-20", "hr=12"}, + new String[]{"dt=2014-07-20", "hr=11"}, + new String[]{"dt=2014-07-20", "hr=10"}, + new String[]{"dt=2014-07-20", "hr=09"}, + new String[]{"dt=2014-07-20", "hr=08"}, + new String[]{"dt=2014-07-20", "hr=07"}, + new String[]{"dt=2014-07-20", "hr=06"}, + new String[]{"dt=2014-07-20", "hr=05"}, + new String[]{"dt=2014-07-20", "hr=04"}, + new String[]{"dt=2014-07-20", "hr=03"}, + new String[]{"dt=2014-07-20", "hr=02"}, + new String[]{"dt=2014-07-20", "hr=01"}, + new String[]{"dt=2014-07-20", "hr=00"}, + new String[]{"dt=2014-07-19"}, // there is day partition for 2nd last day + new String[]{"dt=2014-07-19", "hr=23"} + }; + + String[] partitions = uptoPartitions; + List partitionsList = new ArrayList(); + for (int i = 0; i < 28; i++ ) { + String[] previous = jsonMessageParser.getPreviousPartitions(partitions); + partitionsList.add(previous); + partitions = previous; + } + + assertEquals(partitionsList.size(), expectedPartitions.length); + for (int i = 0; i < partitionsList.size(); i++) { + List expectedPartition = Arrays.asList(expectedPartitions[i]); + List retrievedPartition = Arrays.asList(partitionsList.get(i)); + assertEquals(expectedPartition, retrievedPartition); + } + } + }