Skip to content

Commit

Permalink
Update based on Pawel's feedback.
Browse files Browse the repository at this point in the history
Instead of hard-coding daily/hourly concept in the PartitionFinalizer.  Move the concept of partition boundary and partition ordering into a new interface Partitioner.  PartitionFinalizer will work with the Partitioner to the last partition to finalize up to and collect all the partitions previous to that partition from partitioner.
  • Loading branch information
Henry Cai committed Jul 16, 2015
1 parent bec77cd commit 05159e8
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 305 deletions.
1 change: 1 addition & 0 deletions src/main/config/secor.prod.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ zookeeper.quorum=
# 200MB
secor.max.file.size.bytes=200000000
# 1 hour
# for hourly ingestion/finalization, set this property to smaller value, e.g. 1800
secor.max.file.age.seconds=3600

14 changes: 7 additions & 7 deletions src/main/java/com/pinterest/secor/common/LogFilePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
* @author Pawel Garbacki ([email protected])
*/
public class LogFilePath {
private String mPrefix;
private String mTopic;
private String[] mPartitions;
private int mGeneration;
private int mKafkaPartition;
private long mOffset;
private String mExtension;
private final String mPrefix;
private final String mTopic;
private final String[] mPartitions;
private final int mGeneration;
private final int mKafkaPartition;
private final long mOffset;
private final String mExtension;

public LogFilePath(String prefix, int generation, long lastCommittedOffset,
ParsedMessage message, String extension) {
Expand Down
20 changes: 14 additions & 6 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ public boolean getMessageTimestampUsingHour() {
return getBoolean("message.timestamp.using.hour", false);
}

public int getFinalizerLagSecond() {
return getInt("finalizer.lag.second", 3600);
}

public int getFinalizerLookbackPeriods() {
return getInt("finalizer.lookback.periods", 10);
}

public String getHivePrefix() {
return getString("secor.hive.prefix");
}
Expand Down Expand Up @@ -256,12 +264,12 @@ private int getInt(String name) {
return mProperties.getInt(name);
}

private boolean getBoolean(String name, boolean required) {
if (required) {
checkProperty(name);
return mProperties.getBoolean(name);
}
return mProperties.getBoolean(name, false);
private int getInt(String name, int defaultValue) {
return mProperties.getInt(name, defaultValue);
}

private boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
}

private long getLong(String name) {
Expand Down
326 changes: 112 additions & 214 deletions src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java

Large diffs are not rendered by default.

97 changes: 97 additions & 0 deletions src/main/java/com/pinterest/secor/parser/Partitioner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.parser;

import com.pinterest.secor.message.Message;

import java.util.List;

/**
* The Partitioner knows when to finalize a file folder partition.
*
* A file folder partition (e.g. dt=2015-07-07) can be finalized when all
* messages in that date arrived. The caller (PartitionFinalizer) will do the
* finalization work (e.g. generate _SUCCESS file, perform hive registration)
*
* The partitioner provide the method to calculate the range of file
* folder partitions to be finalized and provide the method to iterate through
* the range.
*
* The caller will first provide a list of last-consumed messages for a given
* kafka topic and call #getFinalizedUptoPartitions to get the finalized-up-to
* partition and then walk backwards by calling #getPreviousPartitions to
* collect all the previous partitions which are ready to be finalized.
*
* Note that finalize-up-to partition itself is not inclusive in the range of
* partitions to be finalized.
*
* The caller might repeat this loop multiple times when the filesystem partition
* is multi-dimentional (e.g. [dt=2015-07-07,hr=05]). it will loop once for the
* hourly folder finalization and another time for the daily folder.
*
* Note that although we use daily/hourly partition illustrate the use of
* partitioner, it is be no means the partitioner can only work with timestamp
* based partitioning, it should also be able to work with offset based
* partitioning as long as we establish an iterating order within those
* partitions.
*
* @author Henry Cai ([email protected])
*/
public interface Partitioner {
/**
* Calculates the partition to finalize-up-to from a list of last-consumed
* messages and a list of last-enqueued messages.
*
* For each kafka topic/partition for a given topic, the caller will provide
* two messages:
* * lastMessage: the last message at the tail of the kafka queue
* * committedMessage: the message secor consumed and committed
* And then iterate over all the kafka topic partitions for the given topic,
* the caller will gather the above two messages into two lists.
*
* The Partitioner will compare the messages from all kafka partitions to
* see which one is the earliest to finalize up to. The partitioner will
* normally use the timestamp from the committedMessage to decide
* the finalize time. But for some slow topics where there is no new
* messages coming for a while (i.e. lastMessage == committedMessage),
* the partitioner can use the current time as the finalize time.
*
* Note that the up-to partition itself is not inclusive in the range to be
* finalized. For example, when the last message is in 2015-07-07,
* 7/7 itself is not complete yet.
*
* Note also that the partitioner might want to adjust down the finalize
* time to allow a safety lag for late arrival messages. e.g. adding one
* extra hour lag
*
* @param lastMessages the last message at the tail of the queue
* @param committedMessages the message secor consumed and committed
*
* @return a String array to represent a file folder partition to finalize up to
*/
String[] getFinalizedUptoPartitions(List<Message> lastMessages,
List<Message> committedMessages) throws Exception;

/**
* Get the previous partition out of the incoming partition.
* E.g. for ["dt=2015-07-07","hr=05"], it will return ["dt=2015-07-07","hr=04"]
*
* @param partition
* @return
*/
String[] getPreviousPartitions(String[] partition) throws Exception;
}
113 changes: 98 additions & 15 deletions src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,36 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;

public abstract class TimestampedMessageParser extends MessageParser {
public abstract class TimestampedMessageParser extends MessageParser implements Partitioner {

private static final Logger LOG = LoggerFactory.getLogger(TimestampedMessageParser.class);

private static final SimpleDateFormat mDtFormatter = new SimpleDateFormat("yyyy-MM-dd");
private static final SimpleDateFormat mHrFormatter = new SimpleDateFormat("HH");
private static final SimpleDateFormat mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");

static {
mDtFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
mHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
mDtHrFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
}

private final SimpleDateFormat dtFormatter;
private final SimpleDateFormat hrFormatter;
private final boolean usingHourly;
private final boolean mUsingHourly;
private final long mLagInSeconds;

public TimestampedMessageParser(SecorConfig config) {
super(config);
dtFormatter = new SimpleDateFormat("yyyy-MM-dd");
dtFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
hrFormatter = new SimpleDateFormat("HH");
hrFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
usingHourly = config.getMessageTimestampUsingHour();
mUsingHourly = config.getMessageTimestampUsingHour();
mLagInSeconds = config.getFinalizerLagSecond();
LOG.info("UsingHourly: {}, lagInSeconds: {} ", mUsingHourly, mLagInSeconds);
}

public abstract long extractTimestampMillis(final Message message) throws Exception;
Expand All @@ -54,17 +67,87 @@ protected static long toMillis(final long timestamp) {
return timestampMillis;
}

protected String[] generatePartitions(long timestampMillis, boolean usingHourly)
throws Exception {
Date date = new Date(timestampMillis);
String dt = "dt=" + mDtFormatter.format(date);
String hr = "hr=" + mHrFormatter.format(date);
if (usingHourly) {
return new String[]{dt, hr};
} else {
return new String[]{dt};
}
}

protected long parsePartitions(String[] partitions) throws Exception {
String dtValue = partitions[0].split("=")[1];
String hrValue = partitions.length > 1 ? partitions[1].split("=")[1] : "00";
String value = dtValue + "-" + hrValue;
Date date = mDtHrFormatter.parse(value);
return date.getTime();
}

@Override
public String[] extractPartitions(Message message) throws Exception {
// Date constructor takes milliseconds since epoch.
long timestampMillis = extractTimestampMillis(message);
Date date = new Date(timestampMillis);
String dt = "dt=" + dtFormatter.format(date);
String hr = "hr=" + hrFormatter.format(date);
if (usingHourly) {
return new String[]{dt, hr};
return generatePartitions(timestampMillis, mUsingHourly);
}

private long getFinalizedTimestampMillis(Message lastMessage,
Message committedMessage) throws Exception {
long lastTimestamp = extractTimestampMillis(lastMessage);
long committedTimestamp = extractTimestampMillis(committedMessage);
long now = System.currentTimeMillis();
if (lastTimestamp == committedTimestamp && (now - lastTimestamp) > 3600 * 1000) {
LOG.info("No new message coming, use the current time: " + now);
return now;
}
return committedTimestamp;
}

@Override
public String[] getFinalizedUptoPartitions(List<Message> lastMessages,
List<Message> committedMessages) throws Exception {
if (lastMessages == null || committedMessages == null) {
LOG.error("Either: {} and {} is null", lastMessages,
committedMessages);
return null;
}
assert lastMessages.size() == committedMessages.size();

long minMillis = Long.MAX_VALUE;
for (int i = 0; i < lastMessages.size(); i++) {
long millis = getFinalizedTimestampMillis(lastMessages.get(i),
committedMessages.get(i));
if (millis < minMillis) {
minMillis = millis;
}
}
if (minMillis == Long.MAX_VALUE) {
LOG.error("No valid timestamps among messages: {} and {}", lastMessages,
committedMessages);
return null;
}

// add the safety lag for late-arrival messages
long lag = mLagInSeconds * 1000L;
LOG.info("Originally: {}, adjust down {}", minMillis, lag);
return generatePartitions(minMillis - lag, mUsingHourly);
}

@Override
public String[] getPreviousPartitions(String[] partitions) throws Exception {
long millis = parsePartitions(partitions);
long delta;
if (partitions.length == 1) {
delta = 3600L * 24 * 1000L;
} else if (partitions.length == 2) {
delta = 3600L * 1000L;
} else {
return new String[]{dt};
throw new RuntimeException("Unsupported partitions: " + partitions.length);
}
return generatePartitions(millis - delta, partitions.length == 2);
}

}
Loading

0 comments on commit 05159e8

Please sign in to comment.