diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index 55be80479..52b252f81 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -130,10 +130,17 @@ statsd.hostport= # Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123) message.timestamp.name=ts +# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123) +message.event.name=eid + +message.event.mapping.DEFAULT=logs/ + # Name of field that contains a timestamp, as a date Format, for JSON. (2014-08-07, Jul 23 02:16:57 2005, etc...) # Should be used when there is no timestamp in a Long format. Also ignore time zones. message.timestamp.input.pattern=yyyy-MM-dd +message.partition.byevent=true + # To enable compression, set this to a valid compression codec implementing # org.apache.hadoop.io.compress.CompressionCodec interface, such as # 'org.apache.hadoop.io.compress.GzipCodec'. diff --git a/src/main/config/secor.dev.partition.properties b/src/main/config/secor.dev.partition.properties index b924abd8a..6aef7a0b0 100644 --- a/src/main/config/secor.dev.partition.properties +++ b/src/main/config/secor.dev.partition.properties @@ -31,4 +31,17 @@ secor.local.path=/tmp/secor_dev/message_logs/partition ostrich.port=9998 # Partition Date Output format. This is used along with PatternDateMessageParser. Defaults to 'yyyy-MM-dd' *New* -secor.partition.output_dt_format=dd_MM_yyyy \ No newline at end of file +secor.partition.output_dt_format=dd_MM_yyyy + +# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz. +# Available placeholders are +# topic - The topic name the data is being fetched +# partition - The partition name +# generation - Generation +# kafkaPartition - The kafka partition +# fmOffset - First Message offset in the file. +# randomHex - A 4 character random hex to append to the file name +# currentTimestamp - Epoch time +# currentTime - Time in HH-mm format +# folder - Folder to use based on message id map lookup +secor.s3.output_file_pattern={partition}-{currentTimestamp}.json \ No newline at end of file diff --git a/src/main/config/secor.dev.properties b/src/main/config/secor.dev.properties index 39a0c845b..3c81ff63b 100644 --- a/src/main/config/secor.dev.properties +++ b/src/main/config/secor.dev.properties @@ -22,16 +22,4 @@ secor.max.file.size.bytes=1000000000 # 10 seconds secor.max.file.age.seconds=3600 # Compute the max file age for a topic partition using either the oldest or newest file. Defaults to newest -secor.max.file.age.policy=oldest - -# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz. -# Available placeholders are -# topic - The topic name the data is being fetched -# partition - The partition name -# generation - Generation -# kafkaPartition - The kafka partition -# fmOffset - First Message offset in the file. -# randomHex - A 4 character random hex to append to the file name -# currentTimestamp - Epoch time -# currentTime - Time in HH-mm format -secor.s3.output_file_pattern={randomHex}_{partition}_{topic}_{generation}.json \ No newline at end of file +secor.max.file.age.policy=oldest \ No newline at end of file diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index acb6d9c1f..648e64e21 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -214,6 +214,18 @@ public String getMessageTimestampName() { public String getMessageTimestampInputPattern() { return getString("message.timestamp.input.pattern"); } + + public String getMessageEventName() { + return getString("message.event.name"); + } + + public String getMessageEventMapping(String event) { + return getString("message.event.mapping." + event, getString("message.event.mapping.DEFAULT", "")); + } + + public boolean isMessagePartitionByEvent() { + return getBoolean("message.partition.byevent", false); + } public int getFinalizerLookbackPeriods() { return getInt("secor.finalizer.lookback.periods", 10); @@ -266,6 +278,10 @@ private String getString(String name) { checkProperty(name); return mProperties.getString(name); } + + private String getString(String name, String defaultValue) { + return mProperties.getString(name, defaultValue); + } private int getInt(String name) { checkProperty(name); diff --git a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java index e5b5d0c16..1e30a506c 100644 --- a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java @@ -53,10 +53,12 @@ public PatternDateMessageParser(SecorConfig config) { @Override public String[] extractPartitions(Message message) { JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); - String result[] = { defaultDate }; - + boolean useEvent = mConfig.isMessagePartitionByEvent(); + String result[] = { useEvent ? mConfig.getMessageEventMapping("DEFAULT") + defaultDate : defaultDate }; + if (jsonObject != null) { Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); + Object eventValue = jsonObject.get(mConfig.getMessageEventName()); Object inputPattern = mConfig.getMessageTimestampInputPattern(); if (fieldValue != null && inputPattern != null) { try { @@ -69,7 +71,7 @@ public String[] extractPartitions(Message message) { dateFormat = inputFormatter.parse(fieldValue.toString()); } - result[0] = outputFormatter.format(dateFormat); + result[0] = useEvent ? mConfig.getMessageEventMapping(eventValue.toString()) + outputFormatter.format(dateFormat) : outputFormatter.format(dateFormat); return result; } catch (Exception e) { LOG.warn("Impossible to convert date = " + fieldValue.toString() diff --git a/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java index c7f8c3b50..7eeb75dc7 100644 --- a/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java @@ -35,12 +35,15 @@ public class PatternDateMessageParserTest extends TestCase { private Message mFormat2; private Message mFormat3; private Message mFormat4; + private Message mFormat5; + private Message mFormat6; private Message mInvalidDate; @Override public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp"); + Mockito.when(mConfig.getMessageEventName()).thenReturn("event"); byte format1[] = "{\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}" .getBytes("UTF-8"); @@ -57,6 +60,14 @@ public void setUp() throws Exception { byte format4[] = "{\"timestamp\":1450601098673,\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}" .getBytes("UTF-8"); mFormat4 = new Message("test", 0, 0, format4); + + byte format5[] = "{\"event\":\"XYZ\",\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}" + .getBytes("UTF-8"); + mFormat5 = new Message("test", 0, 0, format5); + + byte format6[] = "{\"event\":\"ABC\",\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}" + .getBytes("UTF-8"); + mFormat6 = new Message("test", 0, 0, format6); byte invalidDate[] = "{\"timestamp\":\"11111111\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}" .getBytes("UTF-8"); @@ -81,6 +92,24 @@ public void testExtractDateUsingInputPattern() throws Exception { Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy/MM/dd"); assertEquals("2015/12/20", new PatternDateMessageParser(mConfig).extractPartitions(mFormat4)[0]); } + + @Test + public void testExtractPartitionName() throws Exception { + + Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("yyyy-MM-dd HH:mm:ss"); + Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy-MM-dd"); + Mockito.when(mConfig.isMessagePartitionByEvent()).thenReturn(false); + assertEquals("2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat5)[0]); + assertEquals("2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat6)[0]); + + Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("yyyy-MM-dd HH:mm:ss"); + Mockito.when(mConfig.getPartitionOutputDtFormat()).thenReturn("yyyy-MM-dd"); + Mockito.when(mConfig.getMessageEventMapping("XYZ")).thenReturn("1/"); + Mockito.when(mConfig.getMessageEventMapping("ABC")).thenReturn("2/"); + Mockito.when(mConfig.isMessagePartitionByEvent()).thenReturn(true); + assertEquals("1/2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat5)[0]); + assertEquals("2/2014-07-30", new PatternDateMessageParser(mConfig).extractPartitions(mFormat6)[0]); + } @Test public void testExtractDateWithWrongEntries() throws Exception {