Skip to content

Commit

Permalink
Adding support for hourly s3 data ingestion from secor:
Browse files Browse the repository at this point in the history
    Summary:
    - Add the capability of partition the data file in S3 using hourly folder:
       s3://pinlogs/secor_raw_logs/topic1/dt=2015-07-07/hr=05/
      This way, the data file will be on S3 much sooner (hourly vs daily), people can check the result on S3 much sooner and this also opens the door to have an faster hourly data pipeline on Hadoop side as well.
      The hr folder values are from 00-23

    - To trigger the hourly partition and finalization, add the following parameter in your secor config:
        * # partitioner.granularity.hour=true
      And change the upload threshold to less than one hour:
        * secor.max.file.age.seconds=3000

    - Change the hive partition registration code to register partition using both dt and hr column (it does require the HIVE table to be created or altered to have both dt and hr as the partition columns)

    - The enhancements are done through the following:
      * Introduce a new interface Partitioner which knows what the last partition period to be finalized (generating SUCCESS file) and knows how to find the previous partition periods to be finalized upon.
      * Change the TimestampedMessageParser to implement Partitioner, allow it to extract both the dt and hr from the timestamp field, and knows how to traverse backwards to find the previous partitions
      * Change the Finalizer to work with the Partitioner to loop through the list of ready-to-be finalized partitions for both the hr and dt folder to generate the SUCCESS file and do hive registration

    - Added more unit test on message parser on hourly behavior
    - Added more E2E tests to cover the partitioner and hourly ingestion

    Test Plan: Added both unit tests and e2e tests, and tested manually for a new topic on S3
  • Loading branch information
Henry Cai committed Jul 22, 2015
1 parent 60f4629 commit 32f19e0
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 119 deletions.
16 changes: 16 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ secor.offsets.per.partition=10000000
# How long does it take for secor to forget a topic partition. Applies to stats generation only.
secor.topic_partition.forget.seconds=600

# Setting the partitioner to use hourly partition
# By default, the partitioner will do daily partition, so the data will be
# written into
# s3n://.../topic/dt=2015-07-07/
# If this parameter is set to true, the data will be written into
# s3n://.../topic/dt=2015-07-07/hr=02
# The hour folder ranges from 00 to 23
# partitioner.granularity.hour=true

# During partition finalization, the finalizer will start from the last
# time partition (e.g. dt=2015-07-17) and traverse backwards for n
# partition periods (e.g. dt=2015-07-16, dt=2015-07-15 ...)
# This parameter controls how many partition periods to traverse back
# The default is 10
# secor.finalizer.lookback.periods=10

# If greater than 0, upon starup Secor will clean up directories and files under secor.local.path
# that are older than this value.
secor.local.log.delete.age.hours=-1
Expand Down
2 changes: 1 addition & 1 deletion src/main/config/secor.dev.hr.partition.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser
secor.s3.path=secor_dev/hr_partition
secor.local.path=/tmp/secor_dev/message_logs/hr_partition

message.timestamp.using.hour=true
partitioner.granularity.hour=true

ostrich.port=9998
18 changes: 5 additions & 13 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,8 @@ public String getMessageTimestampInputPattern() {
return getString("message.timestamp.input.pattern");
}

public boolean getMessageTimestampUsingHour() {
return getBoolean("message.timestamp.using.hour", false);
}

public int getFinalizerLagSecond() {
return getInt("finalizer.lag.second", 3600);
}

public int getFinalizerLookbackPeriods() {
return getInt("finalizer.lookback.periods", 10);
return getInt("secor.finalizer.lookback.periods", 10);
}

public String getHivePrefix() {
Expand All @@ -247,6 +239,10 @@ public String getZookeeperPath() {
return getString("secor.zookeeper.path");
}

public boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
}

private void checkProperty(String name) {
if (!mProperties.containsKey(name)) {
throw new RuntimeException("Failed to find required configuration option '" +
Expand All @@ -268,10 +264,6 @@ private int getInt(String name, int defaultValue) {
return mProperties.getInt(name, defaultValue);
}

private boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
}

private long getLong(String name) {
return mProperties.getLong(name);
}
Expand Down
155 changes: 76 additions & 79 deletions src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,98 +85,95 @@ private String[] getFinalizedUptoPartitions(String topic) throws Exception {
return mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages);
}

private void finalizePartitionsUpTo(String topic, String[] partitions) throws Exception {
private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception {
final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path();

// Walk through each dimension of the partitions array
// First finalize the hourly partitions, and then move on to daily partitions
for (int dim = partitions.length; dim > 0; dim--) {
String[] uptoPartitions = Arrays.copyOf(partitions, dim);
LOG.info("Finalize up to (but not include) {} for dim: {}", uptoPartitions, dim);
LOG.info("Finalize up to (but not include) {}, dim: {}",
uptoPartitions, uptoPartitions.length);

String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions);
Stack<String[]> toBeFinalized = new Stack<String[]>();
// Walk backwards to collect all partitions which are previous to the upTo partition
// Do not include the upTo partition
// Stop at the first partition which already have the SUCCESS file
for (int i = 0; i < mLookbackPeriods; i++) {
LOG.info("Looking for partition: " + Arrays.toString(previous));
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
if (FileUtil.exists(logFileDir)) {
String successFilePath = logFileDir + "/_SUCCESS";
if (FileUtil.exists(successFilePath)) {
LOG.info(
"SuccessFile exist already, short circuit return. " + successFilePath);
break;
}
LOG.info("Folder {} exists and ready to be finalized.", logFileDir);
toBeFinalized.push(previous);
} else {
LOG.info("Folder {} doesn't exist, skip", logFileDir);
String[] previous = mMessageParser.getPreviousPartitions(uptoPartitions);
Stack<String[]> toBeFinalized = new Stack<String[]>();
// Walk backwards to collect all partitions which are previous to the upTo partition
// Do not include the upTo partition
// Stop at the first partition which already have the SUCCESS file
for (int i = 0; i < mLookbackPeriods; i++) {
LOG.info("Looking for partition: " + Arrays.toString(previous));
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
if (FileUtil.exists(logFileDir)) {
String successFilePath = logFileDir + "/_SUCCESS";
if (FileUtil.exists(successFilePath)) {
LOG.info(
"SuccessFile exist already, short circuit return. " + successFilePath);
break;
}
previous = mMessageParser.getPreviousPartitions(previous);
LOG.info("Folder {} exists and ready to be finalized.", logFileDir);
toBeFinalized.push(previous);
} else {
LOG.info("Folder {} doesn't exist, skip", logFileDir);
}
previous = mMessageParser.getPreviousPartitions(previous);
}

LOG.info("To be finalized partitions: {}", toBeFinalized);
if (toBeFinalized.isEmpty()) {
LOG.warn("There is no partitions to be finalized for dim: {}", dim);
continue;
}
LOG.info("To be finalized partitions: {}", toBeFinalized);
if (toBeFinalized.isEmpty()) {
LOG.warn("There is no partitions to be finalized.");
return;
}

// Now walk forward the collected partitions to do the finalization
// Note we are deliberately walking backwards and then forwards to make sure we don't
// end up in a situation that a later date partition is finalized and then the system
// crashes (which creates unfinalized partition folders in between)
while (!toBeFinalized.isEmpty()) {
String[] current = toBeFinalized.pop();
LOG.info("Finalizing partition: " + Arrays.toString(current));
// We only perform hive registration on the last dimension of the partition array
// i.e. only do hive registration for the hourly folder, but not for the daily
if (dim == partitions.length) {
try {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < current.length; i++) {
String par = current[i];
// We expect the partition array in the form of key=value if
// they need to go through hive registration
String[] parts = par.split("=");
assert parts.length == 2 : "wrong partition format: " + par;
if (i > 0) {
sb.append(",");
}
sb.append(parts[0]);
sb.append("='");
sb.append(parts[1]);
sb.append("'");
}
LOG.info("Hive partition string: " + sb);
String hivePrefix = null;
try {
hivePrefix = mConfig.getHivePrefix();
} catch (RuntimeException ex) {
LOG.warn("HivePrefix is not defined. Skip hive registration");
// Now walk forward the collected partitions to do the finalization
// Note we are deliberately walking backwards and then forwards to make sure we don't
// end up in a situation that a later date partition is finalized and then the system
// crashes (which creates unfinalized partition folders in between)
while (!toBeFinalized.isEmpty()) {
String[] current = toBeFinalized.pop();
LOG.info("Finalizing partition: " + Arrays.toString(current));
// We only perform hive registration on the last dimension of the partition array
// i.e. only do hive registration for the hourly folder, but not for the daily
if (uptoPartitions.length == current.length) {
try {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < current.length; i++) {
String par = current[i];
// We expect the partition array in the form of key=value if
// they need to go through hive registration
String[] parts = par.split("=");
assert parts.length == 2 : "wrong partition format: " + par;
if (i > 0) {
sb.append(",");
}
if (hivePrefix != null) {
mQuboleClient.addPartition(hivePrefix + topic, sb.toString());
}
} catch (Exception e) {
LOG.error("failed to finalize topic " + topic, e);
continue;
sb.append(parts[0]);
sb.append("='");
sb.append(parts[1]);
sb.append("'");
}
LOG.info("Hive partition string: " + sb);
String hivePrefix = null;
try {
hivePrefix = mConfig.getHivePrefix();
} catch (RuntimeException ex) {
LOG.warn("HivePrefix is not defined. Skip hive registration");
}
if (hivePrefix != null) {
mQuboleClient.addPartition(hivePrefix + topic, sb.toString());
}
} catch (Exception e) {
LOG.error("failed to finalize topic " + topic, e);
continue;
}
}

// Generate the SUCCESS file at the end
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
String successFilePath = logFileDir + "/_SUCCESS";
// Generate the SUCCESS file at the end
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
String successFilePath = logFileDir + "/_SUCCESS";

LOG.info("touching file {}", successFilePath);
FileUtil.touch(successFilePath);
}
LOG.info("touching file {}", successFilePath);
FileUtil.touch(successFilePath);
}

}

public void finalizePartitions() throws Exception {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/pinterest/secor/parser/Partitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ String[] getFinalizedUptoPartitions(List<Message> lastMessages,
* Get the previous partition out of the incoming partition.
* E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"]
*
* Note that the implementation might return the previous sequence in daily/mixed forms, e.g.
* [dt=2015-07-07, hr=01]
* [dt=2015-07-07, hr=00]
* [dt=2015-07-07] <-- dt folder in between
* [dt=2015-07-06, hr=23]
* [dt=2015-07-07, hr=22]
*
* @param partition
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public abstract class TimestampedMessageParser extends MessageParser implements

private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class);

private static final long HOUR_IN_MILLIS = 3600L * 1000L;
private static final long DAY_IN_MILLIS = 3600L * 24 * 1000L;

private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd");
private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH");
private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");
Expand All @@ -42,17 +45,19 @@ public abstract class TimestampedMessageParser extends MessageParser implements
}

private final boolean mUsingHourly;
private final long mLagInSeconds;

public TimestampedMessageParser(SecorConfig config) {
super(config);
mUsingHourly = config.getMessageTimestampUsingHour();
mLagInSeconds = config.getFinalizerLagSecond();
LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds);
mUsingHourly = usingHourly(config);
LOG.info("UsingHourly: {}", mUsingHourly);
}

public abstract long extractTimestampMillis(final Message message) throws Exception;

static boolean usingHourly(SecorConfig config) {
return config.getBoolean("partitioner.granularity.hour", false);
}

protected static long toMillis(final long timestamp) {
final long nanosecondDivider = (long) Math.pow(10, 9 + 9);
final long millisecondDivider = (long) Math.pow(10, 9 + 3);
Expand Down Expand Up @@ -131,23 +136,37 @@ public String[] getFinalizedUptoPartitions(List<Message> lastMessages,
}

// add the safety lag for late-arrival messages
long lag = mLagInSeconds * 1000L;
LOG.info("Originally: {}, adjust down {}", minMillis, lag);
return generatePartitions(minMillis - lag, mUsingHourly);
minMillis -= 3600L * 1000L;
LOG.info("adjusted millis {}", minMillis);
return generatePartitions(minMillis, mUsingHourly);
}

@Override
public String[] getPreviousPartitions(String[] partitions) throws Exception {
long millis = parsePartitions(partitions);
long delta;
if (partitions.length == 1) {
delta = 3600L * 24 * 1000L;
} else if (partitions.length == 2) {
delta = 3600L * 1000L;
boolean usingHourly = mUsingHourly;
if (mUsingHourly && millis % DAY_IN_MILLIS == 0) {
// On the day boundary, if the currrent partition is [dt=07-07, hr=00], the previous
// one is dt=07-06; If the current one is [dt=07-06], the previous one is
// [dt=07-06, hr-23]
// So we would return in the order of:
// dt=07-07, hr=01
// dt=07-07, hr=00
// dt=07-06
// dt=07-06, hr=23
if (partitions.length == 2 ) {
usingHourly = false;
millis -= DAY_IN_MILLIS;
} else {
usingHourly = true;
millis += DAY_IN_MILLIS;
millis -= HOUR_IN_MILLIS;
}
} else {
throw new RuntimeException("Unsupported partitions: " + partitions.length);
long delta = mUsingHourly ? HOUR_IN_MILLIS : DAY_IN_MILLIS;
millis -= delta;
}
return generatePartitions(millis - delta, partitions.length == 2);
return generatePartitions(millis, usingHourly);
}

}
12 changes: 8 additions & 4 deletions src/main/scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFacto

# The minimum wait time is one minute plus delta. Secor is configured to upload files older than
# one minute and we need to make sure that everything ends up on s3 before starting verification.
WAIT_TIME=${SECOR_WAIT_TIME:-30}
WAIT_TIME=${SECOR_WAIT_TIME:-120}
BASE_DIR=$(dirname $0)
CONF_DIR=${BASE_DIR}/..

Expand Down Expand Up @@ -192,6 +192,10 @@ verify() {
echo -e "\e[1;41;97mVerification FAILED\e[0m"
echo "See log ${LOGS_DIR}/log_verifier_${RUNMODE}.log for more details"
tail -n 50 ${LOGS_DIR}/log_verifier_${RUNMODE}.log
echo "See log ${LOGS_DIR}/secor_${RUNMODE}.log for more details"
tail -n 50 ${LOGS_DIR}/secor_${RUNMODE}.log
echo "See log ${LOGS_DIR}/test_log_message_producer.log for more details"
tail -n 50 ${LOGS_DIR}/test_log_message_producer.log
exit ${VERIFICATION_EXIT_CODE}
fi

Expand Down Expand Up @@ -288,7 +292,7 @@ post_and_finalizer_verify_test() {
OLD_ADDITIONAL_OPTS=${ADDITIONAL_OPTS}

if [ $1 = "hr" ]; then
ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dmessage.timestamp.using.hour=true -Dfinalizer.lookback.periods=30"
ADDITIONAL_OPTS="${ADDITIONAL_OPTS} -Dpartitioner.granularity.hour=true -Dsecor.finalizer.lookback.periods=30"
# should be 2 success files for hr folder, 1 for dt folder
FILES=3
else
Expand Down Expand Up @@ -357,8 +361,8 @@ move_offset_back_test() {
set_offsets_in_zookeeper 2
post_messages $((${MESSAGES}*9/10)) 0

echo "Waiting $((${WAIT_TIME}*3)) sec for Secor to upload logs to s3"
sleep $((${WAIT_TIME}*3))
echo "Waiting $((${WAIT_TIME}*2)) sec for Secor to upload logs to s3"
sleep $((${WAIT_TIME}*2))
# 4 because we skipped 2 messages per topic partition and there are 2 partitions per topic.
verify $((${MESSAGES}-4)) 0

Expand Down
Loading

0 comments on commit 32f19e0

Please sign in to comment.