diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index f6c9c2f9b..1c296212d 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -86,6 +86,22 @@ secor.offsets.per.partition=10000000 # How long does it take for secor to forget a topic partition. Applies to stats generation only. secor.topic_partition.forget.seconds=600 +# Setting the partitioner to use hourly partition +# By default, the partitioner will do daily partition, so the data will be +# written into +# s3n://.../topic/dt=2015-07-07/ +# If this parameter is set to true, the data will be written into +# s3n://.../topic/dt=2015-07-07/hr=02 +# The hour folder ranges from 00 to 23 +# partitioner.granularity.hour=true + +# During partition finalization, the finalizer will start from the last +# time partition (e.g. dt=2015-07-17) and traverse backwards for n +# partition periods (e.g. dt=2015-07-16, dt=2015-07-15 ...) +# This parameter controls how many partition periods to traverse back +# The default is 10 +# secor.finalizer.lookback.periods=10 + # If greater than 0, upon starup Secor will clean up directories and files under secor.local.path # that are older than this value. secor.local.log.delete.age.hours=-1 diff --git a/src/main/config/secor.dev.hr.partition.properties b/src/main/config/secor.dev.hr.partition.properties index 2b190ae37..95c7017c2 100644 --- a/src/main/config/secor.dev.hr.partition.properties +++ b/src/main/config/secor.dev.hr.partition.properties @@ -6,6 +6,6 @@ secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser secor.s3.path=secor_dev/hr_partition secor.local.path=/tmp/secor_dev/message_logs/hr_partition -message.timestamp.using.hour=true +partitioner.granularity.hour=true ostrich.port=9998 diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 7d66ddd61..23621750c 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -211,16 +211,8 @@ public String getMessageTimestampInputPattern() { return getString("message.timestamp.input.pattern"); } - public boolean getMessageTimestampUsingHour() { - return getBoolean("message.timestamp.using.hour", false); - } - - public int getFinalizerLagSecond() { - return getInt("finalizer.lag.second", 3600); - } - public int getFinalizerLookbackPeriods() { - return getInt("finalizer.lookback.periods", 10); + return getInt("secor.finalizer.lookback.periods", 10); } public String getHivePrefix() { @@ -247,6 +239,10 @@ public String getZookeeperPath() { return getString("secor.zookeeper.path"); } + public boolean getBoolean(String name, boolean defaultValue) { + return mProperties.getBoolean(name, defaultValue); + } + private void checkProperty(String name) { if (!mProperties.containsKey(name)) { throw new RuntimeException("Failed to find required configuration option '" + @@ -268,10 +264,6 @@ private int getInt(String name, int defaultValue) { return mProperties.getInt(name, defaultValue); } - private boolean getBoolean(String name, boolean defaultValue) { - return mProperties.getBoolean(name, defaultValue); - } - private long getLong(String name) { return mProperties.getLong(name); } diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index b9297d589..58e12fa68 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -85,98 +85,95 @@ private String[] getFinalizedUptoPartitions(String topic) throws Exception { return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages); } - private void finalizePartitionsUpTo(String topic, String[] partitions) throws Exception { + private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception { final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path(); - // Walk through each dimension of the partitions array - // First finalize the hourly partitions, and then move on to daily partitions - for (int dim = partitions.length; dim > 0; dim--) { - String[] uptoPartitions = Arrays.copyOf(partitions, dim); - LOG.info("Finalize up to (but not include) {} for dim: {}", uptoPartitions, dim); + LOG.info("Finalize up to (but not include) {}, dim: {}", + uptoPartitions, uptoPartitions.length); - String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); - Stack toBeFinalized = new Stack(); - // Walk backwards to collect all partitions which are previous to the upTo partition - // Do not include the upTo partition - // Stop at the first partition which already have the SUCCESS file - for (int i = 0; i < mLookbackPeriods; i++) { - LOG.info("Looking for partition: " + Arrays.toString(previous)); - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - if (FileUtil.exists(logFileDir)) { - String successFilePath = logFileDir + "/_SUCCESS"; - if (FileUtil.exists(successFilePath)) { - LOG.info( - "SuccessFile exist already, short circuit return. " + successFilePath); - break; - } - LOG.info("Folder {} exists and ready to be finalized.", logFileDir); - toBeFinalized.push(previous); - } else { - LOG.info("Folder {} doesn't exist, skip", logFileDir); + String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions); + Stack toBeFinalized = new Stack(); + // Walk backwards to collect all partitions which are previous to the upTo partition + // Do not include the upTo partition + // Stop at the first partition which already have the SUCCESS file + for (int i = 0; i < mLookbackPeriods; i++) { + LOG.info("Looking for partition: " + Arrays.toString(previous)); + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + if (FileUtil.exists(logFileDir)) { + String successFilePath = logFileDir + "/_SUCCESS"; + if (FileUtil.exists(successFilePath)) { + LOG.info( + "SuccessFile exist already, short circuit return. " + successFilePath); + break; } - previous = mMessageParser.getPreviousPartitions(previous); + LOG.info("Folder {} exists and ready to be finalized.", logFileDir); + toBeFinalized.push(previous); + } else { + LOG.info("Folder {} doesn't exist, skip", logFileDir); } + previous = mMessageParser.getPreviousPartitions(previous); + } - LOG.info("To be finalized partitions: {}", toBeFinalized); - if (toBeFinalized.isEmpty()) { - LOG.warn("There is no partitions to be finalized for dim: {}", dim); - continue; - } + LOG.info("To be finalized partitions: {}", toBeFinalized); + if (toBeFinalized.isEmpty()) { + LOG.warn("There is no partitions to be finalized."); + return; + } - // Now walk forward the collected partitions to do the finalization - // Note we are deliberately walking backwards and then forwards to make sure we don't - // end up in a situation that a later date partition is finalized and then the system - // crashes (which creates unfinalized partition folders in between) - while (!toBeFinalized.isEmpty()) { - String[] current = toBeFinalized.pop(); - LOG.info("Finalizing partition: " + Arrays.toString(current)); - // We only perform hive registration on the last dimension of the partition array - // i.e. only do hive registration for the hourly folder, but not for the daily - if (dim == partitions.length) { - try { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < current.length; i++) { - String par = current[i]; - // We expect the partition array in the form of key=value if - // they need to go through hive registration - String[] parts = par.split("="); - assert parts.length == 2 : "wrong partition format: " + par; - if (i > 0) { - sb.append(","); - } - sb.append(parts[0]); - sb.append("='"); - sb.append(parts[1]); - sb.append("'"); - } - LOG.info("Hive partition string: " + sb); - String hivePrefix = null; - try { - hivePrefix = mConfig.getHivePrefix(); - } catch (RuntimeException ex) { - LOG.warn("HivePrefix is not defined. Skip hive registration"); + // Now walk forward the collected partitions to do the finalization + // Note we are deliberately walking backwards and then forwards to make sure we don't + // end up in a situation that a later date partition is finalized and then the system + // crashes (which creates unfinalized partition folders in between) + while (!toBeFinalized.isEmpty()) { + String[] current = toBeFinalized.pop(); + LOG.info("Finalizing partition: " + Arrays.toString(current)); + // We only perform hive registration on the last dimension of the partition array + // i.e. only do hive registration for the hourly folder, but not for the daily + if (uptoPartitions.length == current.length) { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < current.length; i++) { + String par = current[i]; + // We expect the partition array in the form of key=value if + // they need to go through hive registration + String[] parts = par.split("="); + assert parts.length == 2 : "wrong partition format: " + par; + if (i > 0) { + sb.append(","); } - if (hivePrefix != null) { - mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); - } - } catch (Exception e) { - LOG.error("failed to finalize topic " + topic, e); - continue; + sb.append(parts[0]); + sb.append("='"); + sb.append(parts[1]); + sb.append("'"); + } + LOG.info("Hive partition string: " + sb); + String hivePrefix = null; + try { + hivePrefix = mConfig.getHivePrefix(); + } catch (RuntimeException ex) { + LOG.warn("HivePrefix is not defined. Skip hive registration"); + } + if (hivePrefix != null) { + mQuboleClient.addPartition(hivePrefix + topic, sb.toString()); } + } catch (Exception e) { + LOG.error("failed to finalize topic " + topic, e); + continue; } + } - // Generate the SUCCESS file at the end - LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, - mConfig.getGeneration(), 0, 0, mFileExtension); - String logFileDir = logFilePath.getLogFileDir(); - String successFilePath = logFileDir + "/_SUCCESS"; + // Generate the SUCCESS file at the end + LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current, + mConfig.getGeneration(), 0, 0, mFileExtension); + String logFileDir = logFilePath.getLogFileDir(); + String successFilePath = logFileDir + "/_SUCCESS"; - LOG.info("touching file {}", successFilePath); - FileUtil.touch(successFilePath); - } + LOG.info("touching file {}", successFilePath); + FileUtil.touch(successFilePath); } + } public void finalizePartitions() throws Exception { diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java index d7a4f78e9..6253758e0 100644 --- a/src/main/java/com/pinterest/secor/parser/Partitioner.java +++ b/src/main/java/com/pinterest/secor/parser/Partitioner.java @@ -90,6 +90,13 @@ String[] getFinalizedUptoPartitions(List lastMessages, * Get the previous partition out of the incoming partition. * E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"] * + * Note that the implementation might return the previous sequence in daily/mixed forms, e.g. + * [dt=2015-07-07, hr=01] + * [dt=2015-07-07, hr=00] + * [dt=2015-07-07] <-- dt folder in between + * [dt=2015-07-06, hr=23] + * [dt=2015-07-07, hr=22] + * * @param partition * @return */ diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java index a13a252db..c0f330044 100644 --- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java @@ -31,6 +31,9 @@ public abstract class TimestampedMessageParser extends MessageParser implements private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class); + private static final long HOUR_IN_MILLIS = 3600L * 1000L; + private static final long DAY_IN_MILLIS = 3600L * 24 * 1000L; + private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd"); private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH"); private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH"); @@ -42,17 +45,19 @@ public abstract class TimestampedMessageParser extends MessageParser implements } private final boolean mUsingHourly; - private final long mLagInSeconds; public TimestampedMessageParser(SecorConfig config) { super(config); - mUsingHourly = config.getMessageTimestampUsingHour(); - mLagInSeconds = config.getFinalizerLagSecond(); - LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds); + mUsingHourly = usingHourly(config); + LOG.info("UsingHourly: {}", mUsingHourly); } public abstract long extractTimestampMillis(final Message message) throws Exception; + static boolean usingHourly(SecorConfig config) { + return config.getBoolean("partitioner.granularity.hour", false); + } + protected static long toMillis(final long timestamp) { final long nanosecondDivider = (long) Math.pow(10, 9 + 9); final long millisecondDivider = (long) Math.pow(10, 9 + 3); @@ -131,23 +136,37 @@ public String[] getFinalizedUptoPartitions(List lastMessages, } // add the safety lag for late-arrival messages - long lag = mLagInSeconds * 1000L; - LOG.info("Originally: {}, adjust down {}", minMillis, lag); - return generatePartitions(minMillis - lag, mUsingHourly); + minMillis -= 3600L * 1000L; + LOG.info("adjusted millis {}", minMillis); + return generatePartitions(minMillis, mUsingHourly); } @Override public String[] getPreviousPartitions(String[] partitions) throws Exception { long millis = parsePartitions(partitions); - long delta; - if (partitions.length == 1) { - delta = 3600L * 24 * 1000L; - } else if (partitions.length == 2) { - delta = 3600L * 1000L; + boolean usingHourly = mUsingHourly; + if (mUsingHourly && millis % DAY_IN_MILLIS == 0) { + // On the day boundary, if the currrent partition is [dt=07-07, hr=00], the previous + // one is dt=07-06; If the current one is [dt=07-06], the previous one is + // [dt=07-06, hr-23] + // So we would return in the order of: + // dt=07-07, hr=01 + // dt=07-07, hr=00 + // dt=07-06 + // dt=07-06, hr=23 + if (partitions.length == 2 ) { + usingHourly = false; + millis -= DAY_IN_MILLIS; + } else { + usingHourly = true; + millis += DAY_IN_MILLIS; + millis -= HOUR_IN_MILLIS; + } } else { - throw new RuntimeException("Unsupported partitions: " + partitions.length); + long delta = mUsingHourly ? HOUR_IN_MILLIS : DAY_IN_MILLIS; + millis -= delta; } - return generatePartitions(millis - delta, partitions.length == 2); + return generatePartitions(millis, usingHourly); } } diff --git a/src/main/scripts/run_tests.sh b/src/main/scripts/run_tests.sh index 2944b4724..d42f7a8dd 100755 --- a/src/main/scripts/run_tests.sh +++ b/src/main/scripts/run_tests.sh @@ -54,7 +54,7 @@ READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFacto # The minimum wait time is one minute plus delta. Secor is configured to upload files older than # one minute and we need to make sure that everything ends up on s3 before starting verification. -WAIT_TIME=${SECOR_WAIT_TIME:-30} +WAIT_TIME=${SECOR_WAIT_TIME:-120} BASE_DIR=$(dirname $0) CONF_DIR=${BASE_DIR}/.. @@ -192,6 +192,10 @@ verify() { echo -e "\e[1;41;97mVerification FAILED\e[0m" echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details" tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log + echo "See log ${LOGS_DIR}/secor_${RUNMODE}.log for more details" + tail -n 50 ${LOGS_DIR}/secor_${RUNMODE}.log + echo "See log ${LOGS_DIR}/test_log_message_producer.log for more details" + tail -n 50 ${LOGS_DIR}/test_log_message_producer.log exit ${VERIFICATION_EXIT_CODE} fi @@ -288,7 +292,7 @@ post_and_finalizer_verify_test() { OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS} if [ $1 = "hr" ]; then - ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dmessage.timestamp.using.hour=true -Dfinalizer.lookback.periods=30" + ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dpartitioner.granularity.hour=true -Dsecor.finalizer.lookback.periods=30" # should be 2 success files for hr folder, 1 for dt folder FILES=3 else @@ -357,8 +361,8 @@ move_offset_back_test() { set_offsets_in_zookeeper 2 post_messages $((${MESSAGES}*9/10)) 0 - echo "Waiting $((${WAIT_TIME}*3)) sec for Secor to upload logs to s3" - sleep $((${WAIT_TIME}*3)) + echo "Waiting $((${WAIT_TIME}*2)) sec for Secor to upload logs to s3" + sleep $((${WAIT_TIME}*2)) # 4 because we skipped 2 messages per topic partition and there are 2 partitions per topic. verify $((${MESSAGES}-4)) 0 diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java index f8157f9f4..96c0cc687 100644 --- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java @@ -18,12 +18,17 @@ import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; + +import java.util.Arrays; +import java.util.List; import junit.framework.TestCase; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.ArrayList; + @RunWith(PowerMockRunner.class) public class JsonMessageParserTest extends TestCase { @@ -39,15 +44,15 @@ public void setUp() throws Exception { Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp"); byte messageWithSecondsTimestamp[] = - "{\"timestamp\":\"1405970352\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithSecondsTimestamp = new Message("test", 0, 0, messageWithSecondsTimestamp); byte messageWithMillisTimestamp[] = - "{\"timestamp\":\"1405970352123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithMillisTimestamp = new Message("test", 0, 0, messageWithMillisTimestamp); byte messageWithMillisFloatTimestamp[] = - "{\"timestamp\":\"1405970352123.0\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); + "{\"timestamp\":\"1405911096123.0\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8"); mMessageWithMillisFloatTimestamp = new Message("test", 0, 0, messageWithMillisFloatTimestamp); byte messageWithoutTimestamp[] = @@ -59,9 +64,9 @@ public void setUp() throws Exception { public void testExtractTimestampMillis() throws Exception { JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); - assertEquals(1405970352000l, jsonMessageParser.extractTimestampMillis(mMessageWithSecondsTimestamp)); - assertEquals(1405970352123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisTimestamp)); - assertEquals(1405970352123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisFloatTimestamp)); + assertEquals(1405911096000l, jsonMessageParser.extractTimestampMillis(mMessageWithSecondsTimestamp)); + assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisTimestamp)); + assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisFloatTimestamp)); // Return 0 if there's no timestamp, for any reason. @@ -101,11 +106,11 @@ public void testExtractPartitions() throws Exception { @Test public void testExtractHourlyPartitions() throws Exception { - Mockito.when(mConfig.getMessageTimestampUsingHour()).thenReturn(true); + Mockito.when(TimestampedMessageParser.usingHourly(mConfig)).thenReturn(true); JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); String expectedDtPartition = "dt=2014-07-21"; - String expectedHrPartition = "hr=19"; + String expectedHrPartition = "hr=02"; String resultSeconds[] = jsonMessageParser.extractPartitions(mMessageWithSecondsTimestamp); assertEquals(2, resultSeconds.length); @@ -118,4 +123,84 @@ public void testExtractHourlyPartitions() throws Exception { assertEquals(expectedHrPartition, resultMillis[1]); } + @Test + public void testDailyGetFinalizedUptoPartitions() throws Exception { + JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); + + List lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(1, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + + String[] previous = jsonMessageParser.getPreviousPartitions(uptoPartitions); + assertEquals(1, previous.length); + assertEquals("dt=2014-07-20", previous[0]); + } + + @Test + public void testHourlyGetFinalizedUptoPartitions() throws Exception { + Mockito.when(TimestampedMessageParser.usingHourly(mConfig)).thenReturn(true); + JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig); + + List lastMessages = new ArrayList(); + lastMessages.add(mMessageWithSecondsTimestamp); + List committedMessages = new ArrayList(); + committedMessages.add(mMessageWithMillisTimestamp); + String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, + committedMessages); + assertEquals(2, uptoPartitions.length); + assertEquals("dt=2014-07-21", uptoPartitions[0]); + assertEquals("hr=01", uptoPartitions[1]); + + String[][] expectedPartitions = new String[][] { + new String[]{"dt=2014-07-21", "hr=00"}, + new String[]{"dt=2014-07-20"}, // there is day partition for previous day + new String[]{"dt=2014-07-20", "hr=23"}, + new String[]{"dt=2014-07-20", "hr=22"}, + new String[]{"dt=2014-07-20", "hr=21"}, + new String[]{"dt=2014-07-20", "hr=20"}, + new String[]{"dt=2014-07-20", "hr=19"}, + new String[]{"dt=2014-07-20", "hr=18"}, + new String[]{"dt=2014-07-20", "hr=17"}, + new String[]{"dt=2014-07-20", "hr=16"}, + new String[]{"dt=2014-07-20", "hr=15"}, + new String[]{"dt=2014-07-20", "hr=14"}, + new String[]{"dt=2014-07-20", "hr=13"}, + new String[]{"dt=2014-07-20", "hr=12"}, + new String[]{"dt=2014-07-20", "hr=11"}, + new String[]{"dt=2014-07-20", "hr=10"}, + new String[]{"dt=2014-07-20", "hr=09"}, + new String[]{"dt=2014-07-20", "hr=08"}, + new String[]{"dt=2014-07-20", "hr=07"}, + new String[]{"dt=2014-07-20", "hr=06"}, + new String[]{"dt=2014-07-20", "hr=05"}, + new String[]{"dt=2014-07-20", "hr=04"}, + new String[]{"dt=2014-07-20", "hr=03"}, + new String[]{"dt=2014-07-20", "hr=02"}, + new String[]{"dt=2014-07-20", "hr=01"}, + new String[]{"dt=2014-07-20", "hr=00"}, + new String[]{"dt=2014-07-19"}, // there is day partition for 2nd last day + new String[]{"dt=2014-07-19", "hr=23"} + }; + + String[] partitions = uptoPartitions; + List partitionsList = new ArrayList(); + for (int i = 0; i < 28; i++ ) { + String[] previous = jsonMessageParser.getPreviousPartitions(partitions); + partitionsList.add(previous); + partitions = previous; + } + + assertEquals(partitionsList.size(), expectedPartitions.length); + for (int i = 0; i < partitionsList.size(); i++) { + List expectedPartition = Arrays.asList(expectedPartitions[i]); + List retrievedPartition = Arrays.asList(partitionsList.get(i)); + assertEquals(expectedPartition, retrievedPartition); + } + } + }