Skip to content

Commit

Permalink
Partition files by date and message id
Browse files Browse the repository at this point in the history
  • Loading branch information
Santhosh Vasabhaktula authored and Santhosh Vasabhaktula committed Mar 12, 2016
1 parent 1c74396 commit 0259422
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 17 deletions.
7 changes: 7 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,17 @@ statsd.hostport=
# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123)
message.timestamp.name=ts

# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123)
message.event.name=eid

message.event.mapping.DEFAULT=logs/

# Name of field that contains a timestamp, as a date Format, for JSON. (2014-08-07, Jul 23 02:16:57 2005, etc...)
# Should be used when there is no timestamp in a Long format. Also ignore time zones.
message.timestamp.input.pattern=yyyy-MM-dd

message.partition.byevent=true

# To enable compression, set this to a valid compression codec implementing
# org.apache.hadoop.io.compress.CompressionCodec interface, such as
# 'org.apache.hadoop.io.compress.GzipCodec'.
Expand Down
15 changes: 14 additions & 1 deletion src/main/config/secor.dev.partition.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,17 @@ secor.local.path=/tmp/secor_dev/message_logs/partition
ostrich.port=9998

# Partition Date Output format. This is used along with PatternDateMessageParser. Defaults to 'yyyy-MM-dd' *New*
secor.partition.output_dt_format=dd_MM_yyyy
secor.partition.output_dt_format=dd_MM_yyyy

# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz.
# Available placeholders are
# topic - The topic name the data is being fetched
# partition - The partition name
# generation - Generation
# kafkaPartition - The kafka partition
# fmOffset - First Message offset in the file.
# randomHex - A 4 character random hex to append to the file name
# currentTimestamp - Epoch time
# currentTime - Time in HH-mm format
# folder - Folder to use based on message id map lookup
secor.s3.output_file_pattern={partition}-{currentTimestamp}.json
14 changes: 1 addition & 13 deletions src/main/config/secor.dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,4 @@ secor.max.file.size.bytes=1000000000
# 10 seconds
secor.max.file.age.seconds=3600
# Compute the max file age for a topic partition using either the oldest or newest file. Defaults to newest
secor.max.file.age.policy=oldest

# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz.
# Available placeholders are
# topic - The topic name the data is being fetched
# partition - The partition name
# generation - Generation
# kafkaPartition - The kafka partition
# fmOffset - First Message offset in the file.
# randomHex - A 4 character random hex to append to the file name
# currentTimestamp - Epoch time
# currentTime - Time in HH-mm format
secor.s3.output_file_pattern={randomHex}_{partition}_{topic}_{generation}.json
secor.max.file.age.policy=oldest
16 changes: 16 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ public String getMessageTimestampName() {
public String getMessageTimestampInputPattern() {
return getString("message.timestamp.input.pattern");
}

public String getMessageEventName() {
return getString("message.event.name");
}

public String getMessageEventMapping(String event) {
return getString("message.event.mapping." + event, getString("message.event.mapping.DEFAULT", ""));
}

public boolean isMessagePartitionByEvent() {
return getBoolean("message.partition.byevent", false);
}

public int getFinalizerLookbackPeriods() {
return getInt("secor.finalizer.lookback.periods", 10);
Expand Down Expand Up @@ -266,6 +278,10 @@ private String getString(String name) {
checkProperty(name);
return mProperties.getString(name);
}

private String getString(String name, String defaultValue) {
return mProperties.getString(name, defaultValue);
}

private int getInt(String name) {
checkProperty(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ public PatternDateMessageParser(SecorConfig config) {
@Override
public String[] extractPartitions(Message message) {
JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
String result[] = { defaultDate };

boolean useEvent = mConfig.isMessagePartitionByEvent();
String result[] = { useEvent ? mConfig.getMessageEventMapping("DEFAULT") + defaultDate : defaultDate };

if (jsonObject != null) {
Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
Object eventValue = jsonObject.get(mConfig.getMessageEventName());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (fieldValue != null && inputPattern != null) {
try {
Expand All @@ -69,7 +71,7 @@ public String[] extractPartitions(Message message) {
dateFormat = inputFormatter.parse(fieldValue.toString());
}

result[0] = outputFormatter.format(dateFormat);
result[0] = useEvent ? mConfig.getMessageEventMapping(eventValue.toString()) + outputFormatter.format(dateFormat) : outputFormatter.format(dateFormat);
return result;
} catch (Exception e) {
LOG.warn("Impossible to convert date = " + fieldValue.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public class PatternDateMessageParserTest extends TestCase {
private Message mFormat2;
private Message mFormat3;
private Message mFormat4;
private Message mFormat5;
private Message mFormat6;
private Message mInvalidDate;

@Override
public void setUp() throws Exception {
mConfig = Mockito.mock(SecorConfig.class);
Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp");
Mockito.when(mConfig.getMessageEventName()).thenReturn("event");

byte format1[] = "{\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"[email protected]\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
Expand All @@ -57,6 +60,14 @@ public void setUp() throws Exception {
byte format4[] = "{\"timestamp\":1450601098673,\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"[email protected]\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
mFormat4 = new Message("test", 0, 0, format4);

byte format5[] = "{\"event\":\"XYZ\",\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"[email protected]\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
mFormat5 = new Message("test", 0, 0, format5);

byte format6[] = "{\"event\":\"ABC\",\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"[email protected]\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
mFormat6 = new Message("test", 0, 0, format6);

byte invalidDate[] = "{\"timestamp\":\"11111111\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"[email protected]\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
Expand All @@ -81,6 +92,24 @@ public void testExtractDateUsingInputPattern() throws Exception {
Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy/MM/dd");
assertEquals("2015/12/20", new PatternDateMessageParser(mConfig).extractPartitions(mFormat4)[0]);
}

@Test
public void testExtractPartitionName() throws Exception {

Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("yyyy-MM-dd HH:mm:ss");
Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy-MM-dd");
Mockito.when(mConfig.isMessagePartitionByEvent()).thenReturn(false);
assertEquals("2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat5)[0]);
assertEquals("2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat6)[0]);

Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("yyyy-MM-dd HH:mm:ss");
Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy-MM-dd");
Mockito.when(mConfig.getMessageEventMapping("XYZ")).thenReturn("1/");
Mockito.when(mConfig.getMessageEventMapping("ABC")).thenReturn("2/");
Mockito.when(mConfig.isMessagePartitionByEvent()).thenReturn(true);
assertEquals("1/2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat5)[0]);
assertEquals("2/2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat6)[0]);
}

@Test
public void testExtractDateWithWrongEntries() throws Exception {
Expand Down

0 comments on commit 0259422

Please sign in to comment.