Skip to content

Commit

Permalink
Merge pull request pinterest#109 from HenryCaiHaiying/master
Browse files Browse the repository at this point in the history
    Adding support for hourly s3 data ingestion from secor:
  • Loading branch information
HenryCaiHaiying committed Jul 22, 2015
2 parents d7679c6 + 32f19e0 commit d7ba0a8
Show file tree
Hide file tree
Showing 14 changed files with 664 additions and 192 deletions.
16 changes: 16 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ secor.offsets.per.partition=10000000
# How long does it take for secor to forget a topic partition. Applies to stats generation only.
secor.topic_partition.forget.seconds=600

# Setting the partitioner to use hourly partition
# By default, the partitioner will do daily partition, so the data will be
# written into
# s3n://.../topic/dt=2015-07-07/
# If this parameter is set to true, the data will be written into
# s3n://.../topic/dt=2015-07-07/hr=02
# The hour folder ranges from 00 to 23
# partitioner.granularity.hour=true

# During partition finalization, the finalizer will start from the last
# time partition (e.g. dt=2015-07-17) and traverse backwards for n
# partition periods (e.g. dt=2015-07-16, dt=2015-07-15 ...)
# This parameter controls how many partition periods to traverse back
# The default is 10
# secor.finalizer.lookback.periods=10

# If greater than 0, upon starup Secor will clean up directories and files under secor.local.path
# that are older than this value.
secor.local.log.delete.age.hours=-1
Expand Down
11 changes: 11 additions & 0 deletions src/main/config/secor.dev.hr.partition.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
include=secor.dev.properties

secor.kafka.group=secor_hr_partition
secor.message.parser.class=com.pinterest.secor.parser.ThriftMessageParser

secor.s3.path=secor_dev/hr_partition
secor.local.path=/tmp/secor_dev/message_logs/hr_partition

partitioner.granularity.hour=true

ostrich.port=9998
4 changes: 2 additions & 2 deletions src/main/config/secor.dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ zookeeper.quorum=localhost:2181
# Upload policies.
# 10K
secor.max.file.size.bytes=10000
# 1 minute
secor.max.file.age.seconds=60
# 10 seconds
secor.max.file.age.seconds=10

1 change: 1 addition & 0 deletions src/main/config/secor.prod.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ zookeeper.quorum=
# 200MB
secor.max.file.size.bytes=200000000
# 1 hour
# for hourly ingestion/finalization, set this property to smaller value, e.g. 1800
secor.max.file.age.seconds=3600

14 changes: 7 additions & 7 deletions src/main/java/com/pinterest/secor/common/LogFilePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
* @author Pawel Garbacki ([email protected])
*/
public class LogFilePath {
private String mPrefix;
private String mTopic;
private String[] mPartitions;
private int mGeneration;
private int mKafkaPartition;
private long mOffset;
private String mExtension;
private final String mPrefix;
private final String mTopic;
private final String[] mPartitions;
private final int mGeneration;
private final int mKafkaPartition;
private final long mOffset;
private final String mExtension;

public LogFilePath(String prefix, int generation, long lastCommittedOffset,
ParsedMessage message, String extension) {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public String getMessageTimestampInputPattern() {
return getString("message.timestamp.input.pattern");
}

public int getFinalizerLookbackPeriods() {
return getInt("secor.finalizer.lookback.periods", 10);
}

public String getHivePrefix() {
return getString("secor.hive.prefix");
}
Expand All @@ -235,6 +239,10 @@ public String getZookeeperPath() {
return getString("secor.zookeeper.path");
}

public boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
}

private void checkProperty(String name) {
if (!mProperties.containsKey(name)) {
throw new RuntimeException("Failed to find required configuration option '" +
Expand All @@ -252,6 +260,10 @@ private int getInt(String name) {
return mProperties.getInt(name);
}

private int getInt(String name, int defaultValue) {
return mProperties.getInt(name, defaultValue);
}

private long getLong(String name) {
return mProperties.getLong(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
.withArgName("<type>")
.withType(String.class)
.create("type"));
options.addOption(OptionBuilder.withLongOpt("broker")
.withDescription("broker string, e.g. localhost:9092")
.hasArg()
.withArgName("<broker>")
.withType(String.class)
.create("broker"));
options.addOption(OptionBuilder.withLongOpt("timeshift")
.withDescription("message timestamp adjustment in seconds, it will be deducted" +
" from current time")
.hasArg()
.withArgName("<timeshift>")
.withType(Number.class)
.create("timeshift"));

CommandLineParser parser = new GnuParser();
return parser.parse(options, args);
Expand All @@ -74,14 +87,18 @@ public static void main(String[] args) {
String topic = commandLine.getOptionValue("topic");
int messages = ((Number) commandLine.getParsedOptionValue("messages")).intValue();
int producers = ((Number) commandLine.getParsedOptionValue("producers")).intValue();
String broker = commandLine.getOptionValue("broker");
String type = commandLine.getOptionValue("type");
Number timeshiftNumber = ((Number)commandLine.getParsedOptionValue("timeshift"));
int timeshift = timeshiftNumber == null ? 0 : timeshiftNumber.intValue();
for (int i = 0; i < producers; ++i) {
TestLogMessageProducer producer = new TestLogMessageProducer(topic, messages, type);
TestLogMessageProducer producer = new TestLogMessageProducer(
topic, messages, type, broker, timeshift);
producer.start();
}
} catch (Throwable t) {
LOG.error("Log message producer failed", t);
System.exit(1);
}
}
}
}
Loading

0 comments on commit d7ba0a8

Please sign in to comment.