diff --git a/DESIGN.md b/DESIGN.md
old mode 100755
new mode 100644
diff --git a/Dockerfile b/Dockerfile
old mode 100755
new mode 100644
diff --git a/LICENSE b/LICENSE
old mode 100755
new mode 100644
diff --git a/Makefile b/Makefile
old mode 100755
new mode 100644
diff --git a/README.md b/README.md
old mode 100755
new mode 100644
index eddeb0ec2..c8383bc94
--- a/README.md
+++ b/README.md
@@ -29,13 +29,13 @@ Edit `src/main/config/*.properties` files to specify parameters describing the e
##### Create and install jars
```sh
-# By default this will install the "release" (Kafka 0.8 profile)
+# By default this will install the "release" (Kafka 0.10 profile)
mvn package
mkdir ${SECOR_INSTALL_DIR} # directory to place Secor binaries in.
tar -zxvf target/secor-0.1-SNAPSHOT-bin.tar.gz -C ${SECOR_INSTALL_DIR}
-# To use the Kafka 0.10 client you should use the kafka-0.10-dev profile
-mvn -Pkafka-0.10-dev package
+# To use the Kafka 0.8 client you should use the kafka-0.8-dev profile
+mvn -Pkafka-0.8-dev package
```
##### Run tests (optional)
@@ -62,7 +62,9 @@ One of the convenience features of Secor is the ability to group messages and sa
- **Thrift date parser**: parser that extracts timestamps from thrift messages and groups the output based on the date (at a day granularity). To keep things simple, this parser assumes that the timestamp is carried in the first field (id 1) of the thrift message schema by default. The field id can be changed by setting ```message.timestamp.id``` as long as the field is at the top level of the thrift object (i.e. it is not in a nested structure). The timestamp may be expressed either in seconds or milliseconds, or nanoseconds since the epoch. The output goes to date-partitioned paths (e.g., ```s3n://bucket/topic/dt=2014-05-01```, ```s3n://bucket/topic/dt=2014-05-02```). Date partitioning is particularly convenient if the output is to be consumed by ETL tools such as [Hive]. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties). Note the ```message.timestamp.name``` property has no effect on the thrift parsing, which is determined by the field id.
-- **JSON date parser**: parser that extracts timestamps from JSON messages and groups the output based on the date, similar to the Thrift parser above. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.JsonMessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.
+- **JSON timestamp parser**: parser that extracts UNIX timestamps from JSON messages and groups the output based on the date, similar to the Thrift parser above. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.JsonMessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.
+
+- **JSON ISO 8601 date parser**: Assumes your timestamp field uses ISO 8601. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.Iso8601MessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.
- **MessagePack date parser**: parser that extracts timestamps from MessagePack messages and groups the output based on the date, similar to the Thrift and JSON parser. To use this parser, set `secor.message.parser.class=com.pinterest.secor.parser.MessagePackParser`. Like the Thrift parser, the timestamp may be expressed either in seconds or milliseconds, or nanoseconds since the epoch and respects the "message.timestamp.name" property.
@@ -176,7 +178,7 @@ Secor is distributed under [Apache License, Version 2.0](http://www.apache.org/l
* [Rakuten](http://techblog.rakuten.co.jp/)
* [Appsflyer](https://www.appsflyer.com)
* [Wego](https://www.wego.com)
- * [GO_JEK](http://gojekengineering.com/)
+ * [GO-JEK](http://gojekengineering.com/)
## Help
diff --git a/containers/trusty/Dockerfile b/containers/trusty/Dockerfile
old mode 100755
new mode 100644
diff --git a/containers/xenial/Dockerfile b/containers/xenial/Dockerfile
old mode 100755
new mode 100644
diff --git a/pom.xml b/pom.xml
old mode 100755
new mode 100644
index 1578a9ac3..792e229de
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.pinterest
secor
- 0.23
+ 0.24-SNAPSHOT
jar
secor
Kafka to s3/gs/swift logs exporter
@@ -100,12 +100,12 @@
com.amazonaws
aws-java-sdk-s3
- 1.10.68
+ 1.11.160
com.amazonaws
aws-java-sdk-sts
- 1.10.68
+ 1.11.160
net.java.dev.jets3t
@@ -536,6 +536,14 @@
true
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 1.7
+
+
@@ -602,6 +610,20 @@
kafka-0.8-dev
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.0.2
+
+
+ com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java
+
+
+
+
+
org.apache.kafka
diff --git a/src/main/assembly/secor.xml b/src/main/assembly/secor.xml
old mode 100755
new mode 100644
diff --git a/src/main/config/kafka.test.properties b/src/main/config/kafka.test.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/log4j.dev.properties b/src/main/config/log4j.dev.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/log4j.docker.properties b/src/main/config/log4j.docker.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/log4j.prod.properties b/src/main/config/log4j.prod.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties
old mode 100755
new mode 100644
index 857c2eadc..600060988
--- a/src/main/config/secor.common.properties
+++ b/src/main/config/secor.common.properties
@@ -181,6 +181,17 @@ kafka.dual.commit.enabled=true
# Possible values: "zookeeper" to read offset from zookeeper or "kafka" to read offset from kafka consumer topic
kafka.offsets.storage=zookeeper
+# Parameter which tells whether to extract Kafka message timestamp. This value is to be chose in case of 0.10.x kafka brokers.
+# Default value is false. Also specify `kafka.message.timestamp.className` as `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`,
+# in case you are enabling this parameter as `true`.
+kafka.useTimestamp=false
+
+# Classname for the timestamp field you want to use. Default is `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`
+# for 0.10 build profile. Basically, it will be `Kafka8MessageTimestamp` for 0.8 kafka and `Kafka10MessageTimestamp`
+# for 0.10 kafka. Fully classified names are `com.pinterest.secor.timestamp.Kafka8MessageTimestamp` and
+# `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`.
+kafka.message.timestamp.className=com.pinterest.secor.timestamp.Kafka10MessageTimestamp
+
# Secor generation is a version that should be incremented during non-backwards-compatible
# Secor releases. Generation number is one of the components of generated log file names.
# Generation number makes sure that outputs of different Secor versions are isolated.
@@ -404,3 +415,4 @@ parquet.validation=false
secor.orc.message.schema.*=struct\,f:array\,g:int>
# Below config used for defining ORC schema provider class name. User can use the custom implementation for orc schema provider
secor.orc.schema.provider=com.pinterest.secor.util.orc.schema.DefaultORCSchemaProvider
+
diff --git a/src/main/config/secor.dev.azure.properties b/src/main/config/secor.dev.azure.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.dev.backup.properties b/src/main/config/secor.dev.backup.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.dev.gs.properties b/src/main/config/secor.dev.gs.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.dev.hr.partition.properties b/src/main/config/secor.dev.hr.partition.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.dev.partition.properties b/src/main/config/secor.dev.partition.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.dev.properties b/src/main/config/secor.dev.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.prod.backup.properties b/src/main/config/secor.prod.backup.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.prod.partition.properties b/src/main/config/secor.prod.partition.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.prod.properties b/src/main/config/secor.prod.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.test.backup.properties b/src/main/config/secor.test.backup.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.test.partition.properties b/src/main/config/secor.test.partition.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.test.perf.backup.properties b/src/main/config/secor.test.perf.backup.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/secor.test.properties b/src/main/config/secor.test.properties
old mode 100755
new mode 100644
diff --git a/src/main/config/zookeeper.test.properties b/src/main/config/zookeeper.test.properties
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/FileRegistry.java b/src/main/java/com/pinterest/secor/common/FileRegistry.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/KafkaClient.java b/src/main/java/com/pinterest/secor/common/KafkaClient.java
old mode 100755
new mode 100644
index 6da05cc69..82c7b86ee
--- a/src/main/java/com/pinterest/secor/common/KafkaClient.java
+++ b/src/main/java/com/pinterest/secor/common/KafkaClient.java
@@ -18,6 +18,7 @@
import com.google.common.net.HostAndPort;
import com.pinterest.secor.message.Message;
+import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
@@ -50,10 +51,12 @@ public class KafkaClient {
private SecorConfig mConfig;
private ZookeeperConnector mZookeeperConnector;
+ private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory;
public KafkaClient(SecorConfig config) {
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
+ mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
private HostAndPort findLeader(TopicPartition topicPartition) {
@@ -141,8 +144,12 @@ private Message getMessage(TopicPartition topicPartition, long offset,
payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);
}
+ long timestamp = (mConfig.useKafkaTimestamp())
+ ? mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(messageAndOffset)
+ : 0l;
+
return new Message(topicPartition.getTopic(), topicPartition.getPartition(),
- messageAndOffset.offset(), keyBytes, payloadBytes);
+ messageAndOffset.offset(), keyBytes, payloadBytes, timestamp);
}
private SimpleConsumer createConsumer(String host, int port, String clientName) {
diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/OffsetTracker.java b/src/main/java/com/pinterest/secor/common/OffsetTracker.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/OstrichAdminService.java b/src/main/java/com/pinterest/secor/common/OstrichAdminService.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java
old mode 100755
new mode 100644
index f436b8c08..993a3163a
--- a/src/main/java/com/pinterest/secor/common/SecorConfig.java
+++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java
@@ -131,6 +131,14 @@ public String getOffsetsStorage() {
return getString("kafka.offsets.storage");
}
+ public boolean useKafkaTimestamp() {
+ return getBoolean("kafka.useTimestamp", false);
+ }
+
+ public String getKafkaMessageTimestampClass() {
+ return getString("kafka.message.timestamp.className");
+ }
+
public int getGeneration() {
return getInt("secor.generation");
}
@@ -567,7 +575,7 @@ public String getThriftProtocolClass() {
}
public String getMetricsCollectorClass() {
- return mProperties.getString("secor.monitoring.metrics.collector.class");
+ return getString("secor.monitoring.metrics.collector.class");
}
/**
diff --git a/src/main/java/com/pinterest/secor/common/TopicPartition.java b/src/main/java/com/pinterest/secor/common/TopicPartition.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/TopicPartitionGroup.java b/src/main/java/com/pinterest/secor/common/TopicPartitionGroup.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java b/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/FileReader.java b/src/main/java/com/pinterest/secor/io/FileReader.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/FileWriter.java b/src/main/java/com/pinterest/secor/io/FileWriter.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/KeyValue.java b/src/main/java/com/pinterest/secor/io/KeyValue.java
old mode 100755
new mode 100644
index c7f5721a9..6dd60f3bd
--- a/src/main/java/com/pinterest/secor/io/KeyValue.java
+++ b/src/main/java/com/pinterest/secor/io/KeyValue.java
@@ -19,21 +19,23 @@
/**
* Generic Object used to read next message from various file reader
* implementations
- *
+ *
* @author Praveen Murugesan (praveen@uber.com)
*
*/
public class KeyValue {
-
+
private final long mOffset;
private final byte[] mKafkaKey;
private final byte[] mValue;
+ private final long mTimestamp;
// constructor
public KeyValue(long offset, byte[] value) {
this.mOffset = offset;
this.mKafkaKey = new byte[0];
this.mValue = value;
+ this.mTimestamp = -1;
}
// constructor
@@ -41,6 +43,15 @@ public KeyValue(long offset, byte[] kafkaKey, byte[] value) {
this.mOffset = offset;
this.mKafkaKey = kafkaKey;
this.mValue = value;
+ this.mTimestamp = -1;
+ }
+
+ // constructor
+ public KeyValue(long offset, byte[] kafkaKey, byte[] value, long timestamp) {
+ this.mOffset = offset;
+ this.mKafkaKey = kafkaKey;
+ this.mValue = value;
+ this.mTimestamp = timestamp;
}
public long getOffset() {
@@ -50,9 +61,20 @@ public long getOffset() {
public byte[] getKafkaKey() {
return this.mKafkaKey;
}
-
+
public byte[] getValue() {
return this.mValue;
}
+ public long getTimestamp() {
+ return this.mTimestamp;
+ }
+
+ public boolean hasKafkaKey() {
+ return this.mKafkaKey != null && this.mKafkaKey.length != 0;
+ }
+
+ public boolean hasTimestamp(){
+ return this.mTimestamp != -1;
+ }
}
diff --git a/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/impl/FlexibleDelimitedFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/FlexibleDelimitedFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactory.java
old mode 100755
new mode 100644
index 187f0f207..6a08eb421
--- a/src/main/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactory.java
+++ b/src/main/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactory.java
@@ -44,6 +44,7 @@
public class MessagePackSequenceFileReaderWriterFactory implements FileReaderWriterFactory {
private static final int KAFKA_MESSAGE_OFFSET = 1;
private static final int KAFKA_HASH_KEY = 2;
+ private static final int KAFKA_MESSAGE_TIMESTAMP = 3;
private static final byte[] EMPTY_BYTES = new byte[0];
@Override
@@ -76,6 +77,7 @@ public KeyValue next() throws IOException {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(mKey.getBytes());
int mapSize = unpacker.unpackMapHeader();
long offset = 0;
+ long timestamp = -1;
byte[] keyBytes = EMPTY_BYTES;
for (int i = 0; i < mapSize; i++) {
int key = unpacker.unpackInt();
@@ -83,6 +85,9 @@ public KeyValue next() throws IOException {
case KAFKA_MESSAGE_OFFSET:
offset = unpacker.unpackLong();
break;
+ case KAFKA_MESSAGE_TIMESTAMP:
+ timestamp = unpacker.unpackLong();
+ break;
case KAFKA_HASH_KEY:
int keySize = unpacker.unpackBinaryHeader();
keyBytes = new byte[keySize];
@@ -91,7 +96,7 @@ public KeyValue next() throws IOException {
}
}
unpacker.close();
- return new KeyValue(offset, keyBytes, Arrays.copyOfRange(mValue.getBytes(), 0, mValue.getLength()));
+ return new KeyValue(offset, keyBytes, Arrays.copyOfRange(mValue.getBytes(), 0, mValue.getLength()), timestamp);
} else {
return null;
}
@@ -131,25 +136,34 @@ public long getLength() throws IOException {
@Override
public void write(KeyValue keyValue) throws IOException {
- byte[] kafkaKey = keyValue.getKafkaKey();
+ byte[] kafkaKey = keyValue.hasKafkaKey() ? keyValue.getKafkaKey() : new byte[0];
+ long timestamp = keyValue.getTimestamp();
+ final int timestampLength = (keyValue.hasTimestamp()) ? 10 : 0;
// output size estimate
// 1 - map header
// 1 - message pack key
// 9 - max kafka offset
// 1 - message pack key
+ // 9 - kafka timestamp
+ // 1 - message pack key
// 5 - max (sane) kafka key size
// N - size of kafka key
- // = 17 + N
- ByteArrayOutputStream out = new ByteArrayOutputStream(17 + kafkaKey.length);
+ // = 27 + N
+ ByteArrayOutputStream out = new ByteArrayOutputStream(17 + timestampLength + kafkaKey.length);
MessagePacker packer = MessagePack.newDefaultPacker(out)
- .packMapHeader((kafkaKey.length == 0) ? 1 : 2)
+ .packMapHeader(numberOfFieldsMappedInHeader(keyValue))
.packInt(KAFKA_MESSAGE_OFFSET)
.packLong(keyValue.getOffset());
- if (kafkaKey.length != 0) {
+
+ if (keyValue.hasTimestamp())
+ packer.packInt(KAFKA_MESSAGE_TIMESTAMP)
+ .packLong(timestamp);
+
+ if (keyValue.hasKafkaKey())
packer.packInt(KAFKA_HASH_KEY)
.packBinaryHeader(kafkaKey.length)
.writePayload(kafkaKey);
- }
+
packer.close();
byte[] outBytes = out.toByteArray();
this.mKey.set(outBytes, 0, outBytes.length);
@@ -161,5 +175,17 @@ public void write(KeyValue keyValue) throws IOException {
public void close() throws IOException {
this.mWriter.close();
}
+
+ private int numberOfFieldsMappedInHeader(KeyValue keyValue) {
+ int fields = 1;
+
+ if (keyValue.hasKafkaKey())
+ fields++;
+
+ if (keyValue.hasTimestamp())
+ fields++;
+
+ return fields;
+ }
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/impl/SequenceFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/SequenceFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactory.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/ConsumerMain.java b/src/main/java/com/pinterest/secor/main/ConsumerMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java b/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java b/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/PartitionFinalizerMain.java b/src/main/java/com/pinterest/secor/main/PartitionFinalizerMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/ProgressMonitorMain.java b/src/main/java/com/pinterest/secor/main/ProgressMonitorMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java b/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/message/Message.java b/src/main/java/com/pinterest/secor/message/Message.java
old mode 100755
new mode 100644
index 1b51b6063..f355325ed
--- a/src/main/java/com/pinterest/secor/message/Message.java
+++ b/src/main/java/com/pinterest/secor/message/Message.java
@@ -34,13 +34,15 @@ public class Message {
private long mOffset;
private byte[] mKafkaKey;
private byte[] mPayload;
+ private long mTimestamp;
protected String fieldsToString() {
return "topic='" + mTopic + '\'' +
", kafkaPartition=" + mKafkaPartition +
", offset=" + mOffset +
", kafkaKey=" + new String(mKafkaKey) +
- ", payload=" + new String(mPayload);
+ ", payload=" + new String(mPayload) +
+ ", timestamp=" + mTimestamp;
}
@Override
@@ -48,7 +50,7 @@ public String toString() {
return "Message{" + fieldsToString() + '}';
}
- public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload) {
+ public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload, long timestamp) {
mTopic = topic;
mKafkaPartition = kafkaPartition;
mOffset = offset;
@@ -60,6 +62,7 @@ public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, b
if (mPayload == null) {
mPayload = EMPTY_BYTES;
}
+ mTimestamp = timestamp;
}
public String getTopic() {
@@ -82,6 +85,10 @@ public byte[] getPayload() {
return mPayload;
}
+ public long getTimestamp() {
+ return mTimestamp;
+ }
+
public void write(OutputStream output) throws IOException {
output.write(mPayload);
}
diff --git a/src/main/java/com/pinterest/secor/message/ParsedMessage.java b/src/main/java/com/pinterest/secor/message/ParsedMessage.java
old mode 100755
new mode 100644
index 6e212df5c..d3d0bf65e
--- a/src/main/java/com/pinterest/secor/message/ParsedMessage.java
+++ b/src/main/java/com/pinterest/secor/message/ParsedMessage.java
@@ -35,8 +35,8 @@ public String toString() {
}
public ParsedMessage(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload,
- String[] mPartitions) {
- super(topic, kafkaPartition, offset, kafkaKey, payload);
+ String[] mPartitions, long timestamp) {
+ super(topic, kafkaPartition, offset, kafkaKey, payload, timestamp);
this.mPartitions = mPartitions;
}
diff --git a/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/DailyOffsetMessageParser.java b/src/main/java/com/pinterest/secor/parser/DailyOffsetMessageParser.java
old mode 100755
new mode 100644
index e653c6695..1ea49b568
--- a/src/main/java/com/pinterest/secor/parser/DailyOffsetMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/DailyOffsetMessageParser.java
@@ -16,14 +16,11 @@
*/
package com.pinterest.secor.parser;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Date;
-
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
+import org.apache.commons.lang.StringUtils;
-
+import java.util.Date;
/**
* Offset message parser groups messages based on the offset ranges.
diff --git a/src/main/java/com/pinterest/secor/parser/DateMessageParser.java b/src/main/java/com/pinterest/secor/parser/DateMessageParser.java
old mode 100755
new mode 100644
index e47f47406..0eb3606da
--- a/src/main/java/com/pinterest/secor/parser/DateMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/DateMessageParser.java
@@ -82,5 +82,4 @@ public String[] extractPartitions(Message message) {
return result;
}
-
}
diff --git a/src/main/java/com/pinterest/secor/parser/Iso8601MessageParser.java b/src/main/java/com/pinterest/secor/parser/Iso8601MessageParser.java
old mode 100755
new mode 100644
index 7b591eaf4..9a61e3105
--- a/src/main/java/com/pinterest/secor/parser/Iso8601MessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/Iso8601MessageParser.java
@@ -16,18 +16,15 @@
*/
package com.pinterest.secor.parser;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
+import com.pinterest.secor.common.SecorConfig;
+import com.pinterest.secor.message.Message;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
-import javax.xml.bind.DatatypeConverter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.pinterest.secor.common.SecorConfig;
-import com.pinterest.secor.message.Message;
+import javax.xml.bind.DatatypeConverter;
+import java.util.Date;
/**
* Iso8601MessageParser extracts timestamp field (specified by 'message.timestamp.name')
@@ -35,38 +32,34 @@
* @author Jurriaan Pruis (email@jurriaanpruis.nl)
*
*/
-public class Iso8601MessageParser extends MessageParser {
- private static final Logger LOG = LoggerFactory.getLogger(Iso8601MessageParser.class);
- protected static final String defaultDate = "dt=1970-01-01";
- protected static final String defaultFormatter = "yyyy-MM-dd";
- protected static final SimpleDateFormat outputFormatter = new SimpleDateFormat(defaultFormatter);
+public class Iso8601MessageParser extends TimestampedMessageParser {
+ private final boolean m_timestampRequired;
public Iso8601MessageParser(SecorConfig config) {
super(config);
- outputFormatter.setTimeZone(config.getTimeZone());
+ m_timestampRequired = config.isMessageTimestampRequired();
}
@Override
- public String[] extractPartitions(Message message) throws Exception {
+ public long extractTimestampMillis(final Message message) {
JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
- String result[] = { defaultDate };
+ Object fieldValue = jsonObject != null ? getJsonFieldValue(jsonObject) : null;
- if (jsonObject != null) {
- Object fieldValue = getJsonFieldValue(jsonObject);
- if (fieldValue == null) {
- LOG.warn("Missing field value. Using default partition = {}", defaultDate);
- } else {
- try {
- Date dateFormat = DatatypeConverter.parseDateTime(fieldValue.toString()).getTime();
- result[0] = "dt=" + outputFormatter.format(dateFormat);
- } catch (Exception e) {
- LOG.warn("Impossible to convert date = {} as ISO-8601. Using date default = {}",
- fieldValue.toString(), result[0]);
+ if (m_timestampRequired && fieldValue == null) {
+ throw new RuntimeException("Missing timestamp field for message: " + message);
+ }
+
+ if (fieldValue != null) {
+ try {
+ Date dateFormat = DatatypeConverter.parseDateTime(fieldValue.toString()).getTime();
+ return dateFormat.getTime();
+ } catch (IllegalArgumentException ex) {
+ if (m_timestampRequired){
+ throw new RuntimeException("Bad timestamp field for message: " + message);
}
}
}
- return result;
+ return 0;
}
-
}
diff --git a/src/main/java/com/pinterest/secor/parser/JsonMessageParser.java b/src/main/java/com/pinterest/secor/parser/JsonMessageParser.java
old mode 100755
new mode 100644
index eab1e3c57..0d09b7e34
--- a/src/main/java/com/pinterest/secor/parser/JsonMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/JsonMessageParser.java
@@ -46,5 +46,4 @@ public long extractTimestampMillis(final Message message) {
}
return 0;
}
-
}
diff --git a/src/main/java/com/pinterest/secor/parser/MessagePackParser.java b/src/main/java/com/pinterest/secor/parser/MessagePackParser.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/MessageParser.java b/src/main/java/com/pinterest/secor/parser/MessageParser.java
old mode 100755
new mode 100644
index da10b7016..f21e4eeb3
--- a/src/main/java/com/pinterest/secor/parser/MessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/MessageParser.java
@@ -19,10 +19,10 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
+import net.minidev.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import net.minidev.json.JSONObject;
-import net.minidev.json.JSONValue;
+
import java.util.regex.Pattern;
// TODO(pawel): should we offer a multi-message parser capable of parsing multiple types of
@@ -60,7 +60,7 @@ public ParsedMessage parse(Message message) throws Exception {
String[] partitions = extractPartitions(message);
return new ParsedMessage(message.getTopic(), message.getKafkaPartition(),
message.getOffset(), message.getKafkaKey(),
- message.getPayload(), partitions);
+ message.getPayload(), partitions, message.getTimestamp());
}
public abstract String[] extractPartitions(Message payload) throws Exception;
diff --git a/src/main/java/com/pinterest/secor/parser/OffsetMessageParser.java b/src/main/java/com/pinterest/secor/parser/OffsetMessageParser.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java
old mode 100755
new mode 100644
index bac754d78..3ab672895
--- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java
+++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java
@@ -192,7 +192,6 @@ private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throw
LOG.info("touching file {}", successFilePath);
FileUtil.touch(successFilePath);
}
-
}
public void finalizePartitions() throws Exception {
diff --git a/src/main/java/com/pinterest/secor/parser/PartitionedMessageParser.java b/src/main/java/com/pinterest/secor/parser/PartitionedMessageParser.java
old mode 100755
new mode 100644
index 508db3f1c..335203caf
--- a/src/main/java/com/pinterest/secor/parser/PartitionedMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/PartitionedMessageParser.java
@@ -16,14 +16,11 @@
*/
package com.pinterest.secor.parser;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Date;
-
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
+import org.apache.commons.lang.StringUtils;
-
+import java.util.Date;
/**
* Offset message parser groups messages based on the offset ranges.
@@ -48,5 +45,4 @@ public String[] extractPartitions(Message message) throws Exception {
public long extractTimestampMillis(final Message message) {
return new Date().getTime(); //Daily Timestamp generation
}
-
}
diff --git a/src/main/java/com/pinterest/secor/parser/Partitioner.java b/src/main/java/com/pinterest/secor/parser/Partitioner.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/ProtobufMessageParser.java b/src/main/java/com/pinterest/secor/parser/ProtobufMessageParser.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/QuboleClient.java b/src/main/java/com/pinterest/secor/parser/QuboleClient.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/RegexMessageParser.java b/src/main/java/com/pinterest/secor/parser/RegexMessageParser.java
old mode 100755
new mode 100644
index 11a8a41cc..04fc482ee
--- a/src/main/java/com/pinterest/secor/parser/RegexMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/RegexMessageParser.java
@@ -18,11 +18,11 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* RegexMessageParser extracts timestamp field (specified by 'message.timestamp.input.pattern')
@@ -54,5 +54,4 @@ public long extractTimestampMillis(final Message message) {
}
throw new NumberFormatException("Cannot find timestamp field in: " + line);
}
-
}
diff --git a/src/main/java/com/pinterest/secor/parser/SplitByFieldMessageParser.java b/src/main/java/com/pinterest/secor/parser/SplitByFieldMessageParser.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/ThriftMessageParser.java b/src/main/java/com/pinterest/secor/parser/ThriftMessageParser.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java
old mode 100755
new mode 100644
index 03910764c..ec6593ae3
--- a/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java
+++ b/src/main/java/com/pinterest/secor/parser/TimestampedMessageParser.java
@@ -16,8 +16,6 @@
*/
package com.pinterest.secor.parser;
-import com.google.protobuf.Timestamp;
-import com.google.protobuf.util.Timestamps;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import org.slf4j.Logger;
@@ -58,12 +56,15 @@ public abstract class TimestampedMessageParser extends MessageParser implements
protected final boolean mUsingHourly;
protected final boolean mUsingMinutely;
+ protected final boolean mUseKafkaTimestamp;
+
public TimestampedMessageParser(SecorConfig config) {
super(config);
mUsingHourly = usingHourly(config);
mUsingMinutely = usingMinutely(config);
+ mUseKafkaTimestamp = useKafkaTimestamp(config);
mDtFormat = usingDateFormat(config);
mHrFormat = usingHourFormat(config);
mMinFormat = usingMinuteFormat(config);
@@ -91,7 +92,6 @@ public TimestampedMessageParser(SecorConfig config) {
mDtHrMinFormatter = new SimpleDateFormat(mDtFormat+ "-" + mHrFormat + "-" + mMinFormat);
mDtHrMinFormatter.setTimeZone(config.getTimeZone());
-
}
static boolean usingHourly(SecorConfig config) {
@@ -126,6 +126,10 @@ static String usingMinutePrefix(SecorConfig config) {
return config.getString("partitioner.granularity.minute.prefix", "min=");
}
+ static boolean useKafkaTimestamp(SecorConfig config) {
+ return config.useKafkaTimestamp();
+ }
+
protected static long toMillis(final long timestamp) {
final long nanosecondDivider = (long) Math.pow(10, 9 + 9);
final long microsecondDivider = (long) Math.pow(10, 9 + 6);
@@ -145,6 +149,10 @@ protected static long toMillis(final long timestamp) {
public abstract long extractTimestampMillis(final Message message) throws Exception;
+ public long getTimestampMillis(Message message) throws Exception {
+ return (mUseKafkaTimestamp) ? toMillis(message.getTimestamp()) : extractTimestampMillis(message);
+ }
+
protected String[] generatePartitions(long timestampMillis, boolean usingHourly, boolean usingMinutely)
throws Exception {
Date date = new Date(timestampMillis);
@@ -172,14 +180,14 @@ protected long parsePartitions(String[] partitions) throws Exception {
@Override
public String[] extractPartitions(Message message) throws Exception {
// Date constructor takes milliseconds since epoch.
- long timestampMillis = extractTimestampMillis(message);
+ long timestampMillis = getTimestampMillis(message);
return generatePartitions(timestampMillis, mUsingHourly, mUsingMinutely);
}
private long getFinalizedTimestampMillis(Message lastMessage,
Message committedMessage) throws Exception {
- long lastTimestamp = extractTimestampMillis(lastMessage);
- long committedTimestamp = extractTimestampMillis(committedMessage);
+ long lastTimestamp = getTimestampMillis(lastMessage);
+ long committedTimestamp = getTimestampMillis(committedMessage);
long now = System.currentTimeMillis();
if (lastTimestamp == committedTimestamp &&
(now - lastTimestamp) > mFinalizerDelaySeconds * 1000) {
@@ -216,8 +224,8 @@ public String[] getFinalizedUptoPartitions(List lastMessages,
minMillis -= mFinalizerDelaySeconds * 1000L;
LOG.info("adjusted millis {}", minMillis);
return generatePartitions(minMillis, mUsingHourly, mUsingMinutely);
-
}
+
@Override
public String[] getPreviousPartitions(String[] partitions) throws Exception {
long millis = parsePartitions(partitions);
@@ -269,4 +277,4 @@ public String[] getPreviousPartitions(String[] partitions) throws Exception {
}
return generatePartitions(millis, usingHourly, usingMinutely);
}
- }
+}
diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java
old mode 100755
new mode 100644
index 4efcb6512..b99813522
--- a/src/main/java/com/pinterest/secor/reader/MessageReader.java
+++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java
@@ -20,6 +20,7 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.message.Message;
+import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.RateLimitUtil;
import com.pinterest.secor.util.StatsUtil;
@@ -58,6 +59,7 @@ public class MessageReader {
protected final int mTopicPartitionForgetSeconds;
protected final int mCheckMessagesPerSecond;
protected int mNMessages;
+ protected KafkaMessageTimestampFactory mKafkaMessageTimestampFactory;
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
@@ -80,6 +82,7 @@ public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
+ mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
private void updateAccessTime(TopicPartition topicPartition) {
@@ -160,9 +163,13 @@ public Message read() {
RateLimitUtil.acquire(mCheckMessagesPerSecond);
}
MessageAndMetadata kafkaMessage = mIterator.next();
+
+ long timestamp = (mConfig.useKafkaTimestamp())
+ ? mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage)
+ : 0l;
Message message = new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
- kafkaMessage.message());
+ kafkaMessage.message(), timestamp);
TopicPartition topicPartition = new TopicPartition(message.getTopic(),
message.getKafkaPartition());
updateAccessTime(topicPartition);
diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java
new file mode 100644
index 000000000..043901ef5
--- /dev/null
+++ b/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java
@@ -0,0 +1,17 @@
+package com.pinterest.secor.timestamp;
+
+import kafka.message.MessageAndMetadata;
+import kafka.message.MessageAndOffset;
+
+public class Kafka10MessageTimestamp implements KafkaMessageTimestamp {
+
+ @Override
+ public long getTimestamp(MessageAndMetadata kafkaMessage) {
+ return kafkaMessage.timestamp();
+ }
+
+ @Override
+ public long getTimestamp(MessageAndOffset messageAndOffset) {
+ return messageAndOffset.message().timestamp();
+ }
+}
diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java
new file mode 100644
index 000000000..bc461734e
--- /dev/null
+++ b/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java
@@ -0,0 +1,17 @@
+package com.pinterest.secor.timestamp;
+
+import kafka.message.MessageAndMetadata;
+import kafka.message.MessageAndOffset;
+
+public class Kafka8MessageTimestamp implements KafkaMessageTimestamp {
+
+ @Override
+ public long getTimestamp(MessageAndMetadata kafkaMessage) {
+ return 0l;
+ }
+
+ @Override
+ public long getTimestamp(MessageAndOffset messageAndOffset) {
+ return 0l;
+ }
+}
diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java
new file mode 100644
index 000000000..4d53b13a9
--- /dev/null
+++ b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java
@@ -0,0 +1,11 @@
+package com.pinterest.secor.timestamp;
+
+import kafka.message.MessageAndMetadata;
+import kafka.message.MessageAndOffset;
+
+public interface KafkaMessageTimestamp {
+
+ long getTimestamp(MessageAndMetadata kafkaMessage);
+
+ long getTimestamp(MessageAndOffset messageAndOffset);
+}
diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java
new file mode 100644
index 000000000..cc9a760cc
--- /dev/null
+++ b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java
@@ -0,0 +1,23 @@
+package com.pinterest.secor.timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaMessageTimestampFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageTimestampFactory.class);
+
+ private KafkaMessageTimestamp kafkaMessageTimestamp;
+
+ public KafkaMessageTimestampFactory(String kafkaTimestampClassName) {
+ try {
+ Class timestampClass = Class.forName(kafkaTimestampClassName);
+ this.kafkaMessageTimestamp = KafkaMessageTimestamp.class.cast(timestampClass.newInstance());
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public KafkaMessageTimestamp getKafkaMessageTimestamp() {
+ return this.kafkaMessageTimestamp;
+ }
+}
diff --git a/src/main/java/com/pinterest/secor/tools/LogFileDeleter.java b/src/main/java/com/pinterest/secor/tools/LogFileDeleter.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/tools/LogFilePrinter.java b/src/main/java/com/pinterest/secor/tools/LogFilePrinter.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java b/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java b/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java
old mode 100755
new mode 100644
index f9f06c140..aef5b0098
--- a/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java
+++ b/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java
@@ -235,7 +235,7 @@ private String metricName(String key) {
private long getTimestamp(Message message) throws Exception {
if (mMessageParser instanceof TimestampedMessageParser) {
- return ((TimestampedMessageParser)mMessageParser).extractTimestampMillis(message);
+ return ((TimestampedMessageParser)mMessageParser).getTimestampMillis(message);
} else {
return -1;
}
diff --git a/src/main/java/com/pinterest/secor/tools/RandomPartitioner.java b/src/main/java/com/pinterest/secor/tools/RandomPartitioner.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/transformer/IdentityMessageTransformer.java b/src/main/java/com/pinterest/secor/transformer/IdentityMessageTransformer.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/transformer/MessageTransformer.java b/src/main/java/com/pinterest/secor/transformer/MessageTransformer.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java b/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/FutureHandle.java b/src/main/java/com/pinterest/secor/uploader/FutureHandle.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java b/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java b/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/Handle.java b/src/main/java/com/pinterest/secor/uploader/Handle.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/S3UploadHandle.java b/src/main/java/com/pinterest/secor/uploader/S3UploadHandle.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java b/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/UploadManager.java b/src/main/java/com/pinterest/secor/uploader/UploadManager.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/CompressionUtil.java b/src/main/java/com/pinterest/secor/util/CompressionUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/FileUtil.java b/src/main/java/com/pinterest/secor/util/FileUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/IdUtil.java b/src/main/java/com/pinterest/secor/util/IdUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/ParquetUtil.java b/src/main/java/com/pinterest/secor/util/ParquetUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/ProtobufUtil.java b/src/main/java/com/pinterest/secor/util/ProtobufUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/RateLimitUtil.java b/src/main/java/com/pinterest/secor/util/RateLimitUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/ReflectionUtil.java b/src/main/java/com/pinterest/secor/util/ReflectionUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/StatsUtil.java b/src/main/java/com/pinterest/secor/util/StatsUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/ThriftUtil.java b/src/main/java/com/pinterest/secor/util/ThriftUtil.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java b/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java b/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/orc/schema/DefaultORCSchemaProvider.java b/src/main/java/com/pinterest/secor/util/orc/schema/DefaultORCSchemaProvider.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/util/orc/schema/ORCScehmaProvider.java b/src/main/java/com/pinterest/secor/util/orc/schema/ORCScehmaProvider.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/pinterest/secor/writer/MessageWriter.java b/src/main/java/com/pinterest/secor/writer/MessageWriter.java
old mode 100755
new mode 100644
index d29321fcf..090c5a544
--- a/src/main/java/com/pinterest/secor/writer/MessageWriter.java
+++ b/src/main/java/com/pinterest/secor/writer/MessageWriter.java
@@ -92,7 +92,7 @@ public void write(ParsedMessage message) throws Exception {
LogFilePath path = new LogFilePath(mLocalPrefix, mGeneration, offset, message,
mFileExtension);
FileWriter writer = mFileRegistry.getOrCreateWriter(path, mCodec);
- writer.write(new KeyValue(message.getOffset(), message.getKafkaKey(), message.getPayload()));
+ writer.write(new KeyValue(message.getOffset(), message.getKafkaKey(), message.getPayload(), message.getTimestamp()));
LOG.debug("appended message {} to file {}. File length {}",
message, path, writer.getLength());
}
diff --git a/src/main/scripts/docker-entrypoint.sh b/src/main/scripts/docker-entrypoint.sh
old mode 100755
new mode 100644
diff --git a/src/main/thrift/secor.thrift b/src/main/thrift/secor.thrift
old mode 100755
new mode 100644
diff --git a/src/test/config/core-site.xml b/src/test/config/core-site.xml
old mode 100755
new mode 100644
diff --git a/src/test/config/jets3t.properties b/src/test/config/jets3t.properties
old mode 100755
new mode 100644
diff --git a/src/test/config/secor.kafka.migration.test.properties b/src/test/config/secor.kafka.migration.test.properties
old mode 100755
new mode 100644
diff --git a/src/test/config/secor.test.monitoring.properties b/src/test/config/secor.test.monitoring.properties
old mode 100755
new mode 100644
diff --git a/src/test/config/secor.test.protobuf.properties b/src/test/config/secor.test.protobuf.properties
old mode 100755
new mode 100644
diff --git a/src/test/config/test.s3cfg b/src/test/config/test.s3cfg
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/common/LogFilePathTest.java b/src/test/java/com/pinterest/secor/common/LogFilePathTest.java
old mode 100755
new mode 100644
index 9af8887e2..1f8b1c89a
--- a/src/test/java/com/pinterest/secor/common/LogFilePathTest.java
+++ b/src/test/java/com/pinterest/secor/common/LogFilePathTest.java
@@ -41,17 +41,19 @@ public class LogFilePathTest extends TestCase {
".10_0_00000000000000000100.crc";
private LogFilePath mLogFilePath;
+ private long timestamp;
@Override
protected void setUp() throws Exception {
super.setUp();
mLogFilePath = new LogFilePath(PREFIX, TOPIC, PARTITIONS, GENERATION, KAFKA_PARTITION,
LAST_COMMITTED_OFFSET, "");
+ timestamp = System.currentTimeMillis();
}
public void testConstructFromMessage() throws Exception {
ParsedMessage message = new ParsedMessage(TOPIC, KAFKA_PARTITION, 1000, null,
- "some_payload".getBytes(), PARTITIONS);
+ "some_payload".getBytes(), PARTITIONS, timestamp);
LogFilePath logFilePath = new LogFilePath(PREFIX, GENERATION, LAST_COMMITTED_OFFSET,
message, "");
assertEquals(PATH, logFilePath.getLogFilePath());
diff --git a/src/test/java/com/pinterest/secor/common/SecorConfigTest.java b/src/test/java/com/pinterest/secor/common/SecorConfigTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/common/ZookeeperConnectorTest.java b/src/test/java/com/pinterest/secor/common/ZookeeperConnectorTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactoryTest.java
old mode 100755
new mode 100644
index cccdc8f70..84bad994b
--- a/src/test/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactoryTest.java
+++ b/src/test/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactoryTest.java
@@ -43,8 +43,14 @@ public void testMessagePackSequenceReadWriteRoundTrip() throws Exception {
FileWriter fileWriter = factory.BuildFileWriter(tempLogFilePath, null);
KeyValue kv1 = (new KeyValue(23232, new byte[]{44, 55, 66, 77, 88}, new byte[]{23, 45, 40 ,10, 122}));
KeyValue kv2 = (new KeyValue(23233, new byte[]{2, 3, 4, 5}));
+ KeyValue kv3 = (new KeyValue(23234, new byte[]{44, 55, 66, 77, 88}, new byte[]{23, 45, 40 ,10, 122}, 1496318250l));
+ KeyValue kv4 = (new KeyValue(23235, null, new byte[]{23, 45, 40 ,10, 122}, 1496318250l));
+
fileWriter.write(kv1);
fileWriter.write(kv2);
+ fileWriter.write(kv3);
+ fileWriter.write(kv4);
+
fileWriter.close();
FileReader fileReader = factory.BuildFileReader(tempLogFilePath, null);
@@ -56,7 +62,17 @@ public void testMessagePackSequenceReadWriteRoundTrip() throws Exception {
assertEquals(kv2.getOffset(), kvout.getOffset());
assertArrayEquals(kv2.getKafkaKey(), kvout.getKafkaKey());
assertArrayEquals(kv2.getValue(), kvout.getValue());
- }
+ kvout = fileReader.next();
+ assertEquals(kv3.getOffset(), kvout.getOffset());
+ assertArrayEquals(kv3.getKafkaKey(), kvout.getKafkaKey());
+ assertArrayEquals(kv3.getValue(), kvout.getValue());
+ assertEquals(kv3.getTimestamp(), kvout.getTimestamp());
+ kvout = fileReader.next();
+ assertEquals(kv4.getOffset(), kvout.getOffset());
+ assertArrayEquals(new byte[0], kvout.getKafkaKey());
+ assertArrayEquals(kv4.getValue(), kvout.getValue());
+ assertEquals(kv4.getTimestamp(), kvout.getTimestamp());
+ }
}
\ No newline at end of file
diff --git a/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/io/impl/SequenceFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/SequenceFileReaderWriterFactoryTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/message/MessageTest.java b/src/test/java/com/pinterest/secor/message/MessageTest.java
old mode 100755
new mode 100644
index c02ffaea7..d4eb8c59b
--- a/src/test/java/com/pinterest/secor/message/MessageTest.java
+++ b/src/test/java/com/pinterest/secor/message/MessageTest.java
@@ -1,14 +1,12 @@
package com.pinterest.secor.message;
-import static org.junit.Assert.*;
-
import org.junit.Test;
public class MessageTest {
@Test
public void testNullPayload() {
- Message message = new Message("testTopic", 0, 123, null, null);
+ Message message = new Message("testTopic", 0, 123, null, null, 0l);
System.out.println(message);
// no assert necessary, just making sure it does not throw a
diff --git a/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java b/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java
old mode 100755
new mode 100644
index 373489bed..7e9f2225a
--- a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java
@@ -16,18 +16,15 @@
*/
package com.pinterest.secor.parser;
+import com.pinterest.secor.common.SecorConfig;
+import com.pinterest.secor.message.Message;
import junit.framework.TestCase;
-
-import java.util.TimeZone;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.mockito.stubbing.OngoingStubbing;
import org.powermock.modules.junit4.PowerMockRunner;
-import com.pinterest.secor.common.SecorConfig;
-import com.pinterest.secor.message.Message;
+import java.util.TimeZone;
@RunWith(PowerMockRunner.class)
public class DateMessageParserTest extends TestCase {
@@ -40,40 +37,42 @@ public class DateMessageParserTest extends TestCase {
private Message mISOFormat;
private Message mNanosecondISOFormat;
private Message mNestedISOFormat;
- private OngoingStubbing getTimestamp;
+ private long timestamp;
@Override
public void setUp() throws Exception {
mConfig = Mockito.mock(SecorConfig.class);
Mockito.when(mConfig.getTimeZone()).thenReturn(TimeZone.getTimeZone("UTC"));
+ timestamp = System.currentTimeMillis();
+
byte format1[] = "{\"timestamp\":\"2014-07-30 10:53:20\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat1 = new Message("test", 0, 0, null, format1);
+ mFormat1 = new Message("test", 0, 0, null, format1, timestamp);
byte format2[] = "{\"timestamp\":\"2014/10/25\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat2 = new Message("test", 0, 0, null, format2);
+ mFormat2 = new Message("test", 0, 0, null, format2, timestamp);
byte format3[] = "{\"timestamp\":\"02001.July.04 AD 12:08 PM\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat3 = new Message("test", 0, 0, null, format3);
+ mFormat3 = new Message("test", 0, 0, null, format3, timestamp);
byte invalidDate[] = "{\"timestamp\":\"11111111\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mInvalidDate = new Message("test", 0, 0, null, invalidDate);
+ mInvalidDate = new Message("test", 0, 0, null, invalidDate, timestamp);
byte isoFormat[] = "{\"timestamp\":\"2006-01-02T15:04:05Z\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mISOFormat = new Message("test", 0, 0, null, isoFormat);
+ mISOFormat = new Message("test", 0, 0, null, isoFormat, timestamp);
byte nanosecondISOFormat[] = "{\"timestamp\":\"2006-01-02T23:59:59.999999999Z\"}"
.getBytes("UTF-8");
- mNanosecondISOFormat = new Message("test", 0, 0, null, nanosecondISOFormat);
+ mNanosecondISOFormat = new Message("test", 0, 0, null, nanosecondISOFormat, timestamp);
byte nestedISOFormat[] = "{\"meta_data\":{\"created\":\"2016-01-11T11:50:28.647Z\"},\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mNestedISOFormat = new Message("test", 0, 0, null, nestedISOFormat);
+ mNestedISOFormat = new Message("test", 0, 0, null, nestedISOFormat, timestamp);
}
@Test
diff --git a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java
old mode 100755
new mode 100644
index e3e412e62..67233a87c
--- a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java
@@ -16,18 +16,15 @@
*/
package com.pinterest.secor.parser;
+import com.pinterest.secor.common.SecorConfig;
+import com.pinterest.secor.message.Message;
import junit.framework.TestCase;
-
-import java.util.TimeZone;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.mockito.stubbing.OngoingStubbing;
import org.powermock.modules.junit4.PowerMockRunner;
-import com.pinterest.secor.common.SecorConfig;
-import com.pinterest.secor.message.Message;
+import java.util.TimeZone;
@RunWith(PowerMockRunner.class)
public class Iso8601ParserTest extends TestCase {
@@ -41,63 +38,79 @@ public class Iso8601ParserTest extends TestCase {
private Message mNestedISOFormat;
private Message mNanosecondISOFormat;
private Message mMissingDate;
- private OngoingStubbing getTimestamp;
+ private long timestamp;
@Override
public void setUp() throws Exception {
mConfig = Mockito.mock(SecorConfig.class);
+ Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp");
+ Mockito.when(mConfig.getFinalizerDelaySeconds()).thenReturn(3600);
Mockito.when(mConfig.getTimeZone()).thenReturn(TimeZone.getTimeZone("UTC"));
+ Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd");
+ Mockito.when(TimestampedMessageParser.usingHourFormat(mConfig)).thenReturn("HH");
+ Mockito.when(TimestampedMessageParser.usingMinuteFormat(mConfig)).thenReturn("mm");
+ Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
+ Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
+ Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+
+ timestamp = System.currentTimeMillis();
+
byte format1[] = "{\"timestamp\":\"2014-07-30T10:53:20.001Z\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat1 = new Message("test", 0, 0, null, format1);
+ mFormat1 = new Message("test", 0, 0, null, format1, timestamp);
byte format2[] = "{\"timestamp\":\"2014-07-29T10:53:20Z\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat2 = new Message("test", 0, 0, null, format2);
+ mFormat2 = new Message("test", 0, 0, null, format2, timestamp);
byte format3[] = "{\"timestamp\":\"2001-07-04Z\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat3 = new Message("test", 0, 0, null, format3);
+ mFormat3 = new Message("test", 0, 0, null, format3, timestamp);
byte format4[] = "{\"timestamp\":\"2016-03-02T18:36:14+00:00\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mFormat4 = new Message("test", 0, 0, null, format4);
+ mFormat4 = new Message("test", 0, 0, null, format4, timestamp);
byte nestedISOFormat[] = "{\"meta_data\":{\"created\":\"2016-01-11T11:50:28.647Z\"},\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mNestedISOFormat = new Message("test", 0, 0, null, nestedISOFormat);
+ mNestedISOFormat = new Message("test", 0, 0, null, nestedISOFormat, timestamp);
byte nanosecondISOFormat[] = "{\"timestamp\":\"2006-01-02T23:59:59.999999999Z\"}"
.getBytes("UTF-8");
- mNanosecondISOFormat = new Message("test", 0, 0, null, nanosecondISOFormat);
+ mNanosecondISOFormat = new Message("test", 0, 0, null, nanosecondISOFormat, timestamp);
byte invalidDate[] = "{\"timestamp\":\"111-11111111\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mInvalidDate = new Message("test", 0, 0, null, invalidDate);
+ mInvalidDate = new Message("test", 0, 0, null, invalidDate, timestamp);
byte missingDate[] = "{\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}"
.getBytes("UTF-8");
- mMissingDate = new Message("test", 0, 0, null, missingDate);
+ mMissingDate = new Message("test", 0, 0, null, missingDate, timestamp);
}
@Test
- public void testExtractDate() throws Exception {
- Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp");
- assertEquals("dt=2014-07-30", new Iso8601MessageParser(mConfig).extractPartitions(mFormat1)[0]);
- assertEquals("dt=2014-07-29", new Iso8601MessageParser(mConfig).extractPartitions(mFormat2)[0]);
- assertEquals("dt=2001-07-04", new Iso8601MessageParser(mConfig).extractPartitions(mFormat3)[0]);
- assertEquals("dt=2016-03-02", new Iso8601MessageParser(mConfig).extractPartitions(mFormat4)[0]);
- assertEquals("dt=2006-01-02", new Iso8601MessageParser(mConfig).extractPartitions(mNanosecondISOFormat)[0]);
- assertEquals("dt=1970-01-01", new Iso8601MessageParser(mConfig).extractPartitions(mInvalidDate)[0]);
- assertEquals("dt=1970-01-01", new Iso8601MessageParser(mConfig).extractPartitions(mMissingDate)[0]);
+ public void testExtractTimestampMillis() throws Exception {
+ Iso8601MessageParser parser = new Iso8601MessageParser(mConfig);
+
+ assertEquals(1406717600001l, parser.getTimestampMillis(mFormat1));
+ assertEquals(1406631200000l, parser.getTimestampMillis(mFormat2));
+ assertEquals(994204800000l, parser.getTimestampMillis(mFormat3));
+ assertEquals(1456943774000l, parser.getTimestampMillis(mFormat4));
+ assertEquals(1136246399999l, parser.getTimestampMillis(mNanosecondISOFormat));
+
+ // Return 0 if there's no timestamp, for any reason.
+ assertEquals(0l, parser.getTimestampMillis(mInvalidDate));
+ assertEquals(0l, parser.getTimestampMillis(mMissingDate));
}
@Test
- public void testNestedField() throws Exception {
+ public void testExtractNestedTimestampMillis() throws Exception {
Mockito.when(mConfig.getMessageTimestampNameSeparator()).thenReturn(".");
Mockito.when(mConfig.getMessageTimestampName()).thenReturn("meta_data.created");
- assertEquals("dt=2016-01-11", new Iso8601MessageParser(mConfig).extractPartitions(mNestedISOFormat)[0]);
+ Iso8601MessageParser parser = new Iso8601MessageParser(mConfig);
+
+ assertEquals(1452513028647l, parser.getTimestampMillis(mNestedISOFormat));
}
}
diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java
old mode 100755
new mode 100644
index 964287d99..cdff37f55
--- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java
@@ -38,6 +38,7 @@ public class JsonMessageParserTest extends TestCase {
private Message mMessageWithMillisFloatTimestamp;
private Message mMessageWithoutTimestamp;
private Message mMessageWithNestedTimestamp;
+ private long timestamp;
@Override
public void setUp() throws Exception {
@@ -46,45 +47,57 @@ public void setUp() throws Exception {
Mockito.when(mConfig.getFinalizerDelaySeconds()).thenReturn(3600);
Mockito.when(mConfig.getTimeZone()).thenReturn(TimeZone.getTimeZone("UTC"));
- Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd");
- Mockito.when(TimestampedMessageParser.usingHourFormat(mConfig)).thenReturn("HH");
- Mockito.when(TimestampedMessageParser.usingMinuteFormat(mConfig)).thenReturn("mm");
- Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
- Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
- Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+ Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd");
+ Mockito.when(TimestampedMessageParser.usingHourFormat(mConfig)).thenReturn("HH");
+ Mockito.when(TimestampedMessageParser.usingMinuteFormat(mConfig)).thenReturn("mm");
+ Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
+ Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
+ Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
- byte messageWithSecondsTimestamp[] =
+ timestamp = System.currentTimeMillis();
+
+ byte messageWithSecondsTimestamp[] =
"{\"timestamp\":\"1405911096\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithSecondsTimestamp = new Message("test", 0, 0, null, messageWithSecondsTimestamp);
+ mMessageWithSecondsTimestamp = new Message("test", 0, 0, null, messageWithSecondsTimestamp, timestamp);
byte messageWithMillisTimestamp[] =
"{\"timestamp\":\"1405911096123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithMillisTimestamp = new Message("test", 0, 0, null, messageWithMillisTimestamp);
+ mMessageWithMillisTimestamp = new Message("test", 0, 0, null, messageWithMillisTimestamp, timestamp);
byte messageWithMillisFloatTimestamp[] =
"{\"timestamp\":\"1405911096123.0\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithMillisFloatTimestamp = new Message("test", 0, 0, null, messageWithMillisFloatTimestamp);
+ mMessageWithMillisFloatTimestamp = new Message("test", 0, 0, null, messageWithMillisFloatTimestamp, timestamp);
byte messageWithoutTimestamp[] =
"{\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithoutTimestamp = new Message("test", 0, 0, null, messageWithoutTimestamp);
+ mMessageWithoutTimestamp = new Message("test", 0, 0, null, messageWithoutTimestamp, 0l);
byte messageWithNestedTimestamp[] =
"{\"meta_data\":{\"created\":\"1405911096123\"},\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithNestedTimestamp = new Message("test", 0, 0, null, messageWithNestedTimestamp);
+ mMessageWithNestedTimestamp = new Message("test", 0, 0, null, messageWithNestedTimestamp, timestamp);
}
+ @Test
+ public void testExtractTimestampMillisFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.useKafkaTimestamp()).thenReturn(true);
+ JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
+
+ assertEquals(timestamp, jsonMessageParser.getTimestampMillis(mMessageWithSecondsTimestamp));
+ assertEquals(timestamp, jsonMessageParser.getTimestampMillis(mMessageWithMillisTimestamp));
+ assertEquals(timestamp, jsonMessageParser.getTimestampMillis(mMessageWithMillisFloatTimestamp));
+ }
+
@Test
public void testExtractTimestampMillis() throws Exception {
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
- assertEquals(1405911096000l, jsonMessageParser.extractTimestampMillis(mMessageWithSecondsTimestamp));
- assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisTimestamp));
- assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithMillisFloatTimestamp));
+ assertEquals(1405911096000l, jsonMessageParser.getTimestampMillis(mMessageWithSecondsTimestamp));
+ assertEquals(1405911096123l, jsonMessageParser.getTimestampMillis(mMessageWithMillisTimestamp));
+ assertEquals(1405911096123l, jsonMessageParser.getTimestampMillis(mMessageWithMillisFloatTimestamp));
// Return 0 if there's no timestamp, for any reason.
- assertEquals(0l, jsonMessageParser.extractTimestampMillis(mMessageWithoutTimestamp));
+ assertEquals(0l, jsonMessageParser.getTimestampMillis(mMessageWithoutTimestamp));
}
@Test
@@ -93,7 +106,7 @@ public void testExtractNestedTimestampMillis() throws Exception {
Mockito.when(mConfig.getMessageTimestampName()).thenReturn("meta_data.created");
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
- assertEquals(1405911096123l, jsonMessageParser.extractTimestampMillis(mMessageWithNestedTimestamp));
+ assertEquals(1405911096123l, jsonMessageParser.getTimestampMillis(mMessageWithNestedTimestamp));
}
@Test(expected=ClassCastException.class)
@@ -101,7 +114,7 @@ public void testExtractTimestampMillisException1() throws Exception {
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
byte emptyBytes1[] = {};
- jsonMessageParser.extractTimestampMillis(new Message("test", 0, 0, null, emptyBytes1));
+ jsonMessageParser.getTimestampMillis(new Message("test", 0, 0, null, emptyBytes1, timestamp));
}
@Test(expected=ClassCastException.class)
@@ -109,7 +122,7 @@ public void testExtractTimestampMillisException2() throws Exception {
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
byte emptyBytes2[] = "".getBytes();
- jsonMessageParser.extractTimestampMillis(new Message("test", 0, 0, null, emptyBytes2));
+ jsonMessageParser.getTimestampMillis(new Message("test", 0, 0, null, emptyBytes2, timestamp));
}
@Test
@@ -262,7 +275,8 @@ public void testHourlyGetFinalizedUptoPartitions() throws Exception {
assertEquals(expectedPartition, retrievedPartition);
}
}
-@Test
+
+ @Test
public void testMinutelyGetFinalizedUptoPartitions() throws Exception {
Mockito.when(TimestampedMessageParser.usingMinutely(mConfig)).thenReturn(true);
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);
diff --git a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java
old mode 100755
new mode 100644
index 5b7e98748..70c6850c7
--- a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java
@@ -33,16 +33,18 @@
@RunWith(PowerMockRunner.class)
public class MessagePackParserTest extends TestCase {
+ SecorConfig mConfig;
private MessagePackParser mMessagePackParser;
private Message mMessageWithSecondsTimestamp;
private Message mMessageWithMillisTimestamp;
private Message mMessageWithMillisFloatTimestamp;
private Message mMessageWithMillisStringTimestamp;
private ObjectMapper mObjectMapper;
+ private long timestamp;
@Override
public void setUp() throws Exception {
- SecorConfig mConfig = Mockito.mock(SecorConfig.class);
+ mConfig = Mockito.mock(SecorConfig.class);
Mockito.when(mConfig.getMessageTimestampName()).thenReturn("ts");
Mockito.when(mConfig.getTimeZone()).thenReturn(TimeZone.getTimeZone("UTC"));
Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd");
@@ -55,11 +57,12 @@ public void setUp() throws Exception {
mMessagePackParser = new MessagePackParser(mConfig);
mObjectMapper = new ObjectMapper(new MessagePackFactory());
+ timestamp = System.currentTimeMillis();
HashMap mapWithSecondTimestamp = new HashMap();
mapWithSecondTimestamp.put("ts", 1405970352);
mMessageWithSecondsTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWithSecondTimestamp));
+ mObjectMapper.writeValueAsBytes(mapWithSecondTimestamp), timestamp);
HashMap mapWithMillisTimestamp = new HashMap();
mapWithMillisTimestamp.put("ts", 1405970352123l);
@@ -67,7 +70,7 @@ public void setUp() throws Exception {
mapWithMillisTimestamp.put("email", "alice@example.com");
mapWithMillisTimestamp.put("age", 27);
mMessageWithMillisTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWithMillisTimestamp));
+ mObjectMapper.writeValueAsBytes(mapWithMillisTimestamp), timestamp);
HashMap mapWithMillisFloatTimestamp = new HashMap();
@@ -76,7 +79,7 @@ public void setUp() throws Exception {
mapWithMillisFloatTimestamp.put("email", "bob@example.com");
mapWithMillisFloatTimestamp.put("age", 35);
mMessageWithMillisFloatTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWithMillisFloatTimestamp));
+ mObjectMapper.writeValueAsBytes(mapWithMillisFloatTimestamp), timestamp);
HashMap mapWithMillisStringTimestamp = new HashMap();
mapWithMillisStringTimestamp.put("ts", "1405970352123");
@@ -84,21 +87,33 @@ public void setUp() throws Exception {
mapWithMillisStringTimestamp.put("email", "charlie@example.com");
mapWithMillisStringTimestamp.put("age", 67);
mMessageWithMillisStringTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWithMillisStringTimestamp));
-
+ mObjectMapper.writeValueAsBytes(mapWithMillisStringTimestamp), timestamp);
+ }
+ @Test
+ public void testExtractTimestampMillisFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.useKafkaTimestamp()).thenReturn(true);
+ mMessagePackParser = new MessagePackParser(mConfig);
+ assertEquals(timestamp, mMessagePackParser.getTimestampMillis(
+ mMessageWithSecondsTimestamp));
+ assertEquals(timestamp, mMessagePackParser.getTimestampMillis(
+ mMessageWithMillisTimestamp));
+ assertEquals(timestamp, mMessagePackParser.getTimestampMillis(
+ mMessageWithMillisFloatTimestamp));
+ assertEquals(timestamp, mMessagePackParser.getTimestampMillis(
+ mMessageWithMillisStringTimestamp));
}
@Test
public void testExtractTimestampMillis() throws Exception {
- assertEquals(1405970352000l, mMessagePackParser.extractTimestampMillis(
+ assertEquals(1405970352000l, mMessagePackParser.getTimestampMillis(
mMessageWithSecondsTimestamp));
- assertEquals(1405970352123l, mMessagePackParser.extractTimestampMillis(
+ assertEquals(1405970352123l, mMessagePackParser.getTimestampMillis(
mMessageWithMillisTimestamp));
- assertEquals(1405970352123l, mMessagePackParser.extractTimestampMillis(
+ assertEquals(1405970352123l, mMessagePackParser.getTimestampMillis(
mMessageWithMillisFloatTimestamp));
- assertEquals(1405970352123l, mMessagePackParser.extractTimestampMillis(
+ assertEquals(1405970352123l, mMessagePackParser.getTimestampMillis(
mMessageWithMillisStringTimestamp));
}
@@ -107,8 +122,8 @@ public void testMissingTimestamp() throws Exception {
HashMap mapWithoutTimestamp = new HashMap();
mapWithoutTimestamp.put("email", "mary@example.com");
Message nMessageWithoutTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWithoutTimestamp));
- mMessagePackParser.extractTimestampMillis(nMessageWithoutTimestamp);
+ mObjectMapper.writeValueAsBytes(mapWithoutTimestamp), timestamp);
+ mMessagePackParser.getTimestampMillis(nMessageWithoutTimestamp);
}
@Test(expected=NumberFormatException.class)
@@ -116,8 +131,8 @@ public void testUnsupportedTimestampFormat() throws Exception {
HashMap mapWitUnsupportedFormatTimestamp = new HashMap();
mapWitUnsupportedFormatTimestamp.put("ts", "2014-11-14T18:12:52.878Z");
Message nMessageWithUnsupportedFormatTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWitUnsupportedFormatTimestamp));
- mMessagePackParser.extractTimestampMillis(nMessageWithUnsupportedFormatTimestamp);
+ mObjectMapper.writeValueAsBytes(mapWitUnsupportedFormatTimestamp), timestamp);
+ mMessagePackParser.getTimestampMillis(nMessageWithUnsupportedFormatTimestamp);
}
@Test(expected=NullPointerException.class)
@@ -125,8 +140,8 @@ public void testNullTimestamp() throws Exception {
HashMap mapWitNullTimestamp = new HashMap();
mapWitNullTimestamp.put("ts", null);
Message nMessageWithNullTimestamp = new Message("test", 0, 0, null,
- mObjectMapper.writeValueAsBytes(mapWitNullTimestamp));
- mMessagePackParser.extractTimestampMillis(nMessageWithNullTimestamp);
+ mObjectMapper.writeValueAsBytes(mapWitNullTimestamp), timestamp);
+ mMessagePackParser.getTimestampMillis(nMessageWithNullTimestamp);
}
@Test
diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java
old mode 100755
new mode 100644
index 7883a61ba..ce92dc147
--- a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java
@@ -35,12 +35,13 @@
@RunWith(PowerMockRunner.class)
public class ProtobufMessageParserTest extends TestCase {
private SecorConfig mConfig;
+ private long timestamp;
private Message buildMessage(long timestamp) throws Exception {
byte data[] = new byte[16];
CodedOutputStream output = CodedOutputStream.newInstance(data);
output.writeUInt64(1, timestamp);
- return new Message("test", 0, 0, null, data);
+ return new Message("test", 0, 0, null, data, timestamp);
}
@Override
@@ -52,6 +53,17 @@ public void setUp() throws Exception {
Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Test
+ public void testExtractTimestampMillisFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.getBoolean("kafka.useTimestamp", false)).thenReturn(true);
+ ProtobufMessageParser parser = new ProtobufMessageParser(mConfig);
+
+ assertEquals(1405970352000l, parser.extractTimestampMillis(buildMessage(1405970352l)));
+ assertEquals(1405970352123l, parser.extractTimestampMillis(buildMessage(1405970352123l)));
}
@Test
@@ -73,11 +85,11 @@ public void testExtractPathTimestampMillis() throws Exception {
UnitTestMessage1 message = UnitTestMessage1.newBuilder().setTimestamp(1405970352L).build();
assertEquals(1405970352000l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp)));
message = UnitTestMessage1.newBuilder().setTimestamp(1405970352123l).build();
assertEquals(1405970352123l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp)));
}
@Test
@@ -92,11 +104,11 @@ public void testExtractNestedTimestampMillis() throws Exception {
UnitTestMessage2 message = UnitTestMessage2.newBuilder()
.setInternal(UnitTestMessage2.Internal.newBuilder().setTimestamp(1405970352L).build()).build();
assertEquals(1405970352000l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp)));
message = UnitTestMessage2.newBuilder()
.setInternal(UnitTestMessage2.Internal.newBuilder().setTimestamp(1405970352123l).build()).build();
assertEquals(1405970352123l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp)));
}
}
diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java
old mode 100755
new mode 100644
index c2c082861..1c662165c
--- a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java
@@ -22,12 +22,13 @@
@RunWith(PowerMockRunner.class)
public class ProtobufTimestampParserTest extends TestCase {
private SecorConfig mConfig;
+ private long timestamp;
private Message buildMessage(long timestamp) throws Exception {
byte data[] = new byte[16];
CodedOutputStream output = CodedOutputStream.newInstance(data);
output.writeUInt64(1, timestamp);
- return new Message("test", 0, 0, null, data);
+ return new Message("test", 0, 0, null, data, timestamp);
}
@Override
@@ -39,6 +40,17 @@ public void setUp() throws Exception {
Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Test
+ public void testExtractTimestampMillisFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.getBoolean("kafka.useTimestamp", false)).thenReturn(true);
+ ProtobufMessageParser parser = new ProtobufMessageParser(mConfig);
+
+ assertEquals(1405970352000l, parser.extractTimestampMillis(buildMessage(1405970352l)));
+ assertEquals(1405970352123l, parser.extractTimestampMillis(buildMessage(1405970352123l)));
}
@Test
@@ -65,13 +77,13 @@ public void testExtractPathTimestampMillis() throws Exception {
TimestampedMessages.UnitTestTimestamp1 message = TimestampedMessages.UnitTestTimestamp1.newBuilder().setTimestamp(timestamp).build();
assertEquals(1405970352000l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp.getSeconds())));
Timestamp timestampWithNano = Timestamp.newBuilder().setSeconds(1405970352l)
.setNanos(123000000).build();
message = TimestampedMessages.UnitTestTimestamp1.newBuilder().setTimestamp(timestampWithNano).build();
assertEquals(1405970352123l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp.getSeconds())));
}
@Test
@@ -88,12 +100,12 @@ public void testExtractNestedTimestampMillis() throws Exception {
TimestampedMessages.UnitTestTimestamp2 message = TimestampedMessages.UnitTestTimestamp2.newBuilder()
.setInternal(TimestampedMessages.UnitTestTimestamp2.Internal.newBuilder().setTimestamp(timestamp).build()).build();
assertEquals(1405970352000l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp.getSeconds())));
timestamp = Timestamps.fromMillis(1405970352123l);
message = TimestampedMessages.UnitTestTimestamp2.newBuilder()
.setInternal(TimestampedMessages.UnitTestTimestamp2.Internal.newBuilder().setTimestamp(timestamp).build()).build();
assertEquals(1405970352123l,
- parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray())));
+ parser.extractTimestampMillis(new Message("test", 0, 0, null, message.toByteArray(), timestamp.getSeconds())));
}
}
diff --git a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java
old mode 100755
new mode 100644
index 1c32af6cc..5f9bcbdab
--- a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java
@@ -31,6 +31,7 @@ public class RegexMessageParserTest extends TestCase {
private SecorConfig mConfig;
private Message mMessageWithMillisTimestamp;
private Message mMessageWithWrongFormatTimestamp;
+ private long timestamp;
@Override
public void setUp() throws Exception {
@@ -44,14 +45,24 @@ public void setUp() throws Exception {
Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+ timestamp = System.currentTimeMillis();
+
byte messageWithMillisTimestamp[] =
"?24.140.88.218 2015/09/22T22:19:00+0000 1442960340 GET http://api.com/test/?id=123 HTTP/1.1 s200 1017 0.384213448 pass - r685206763364 91ea566f - \"for iOS/5.4.2 (iPhone; 9.0)\"".getBytes("UTF-8");
- mMessageWithMillisTimestamp = new Message("test", 0, 0, null, messageWithMillisTimestamp);
+ mMessageWithMillisTimestamp = new Message("test", 0, 0, null, messageWithMillisTimestamp, timestamp);
byte messageWithWrongFormatTimestamp[] =
"?24.140.88.218 2015/09/22T22:19:00+0000 A1442960340 GET http://api.com/test/?id=123 HTTP/1.1 s200 1017 0.384213448 pass - r685206763364 91ea566f - \"for iOS/5.4.2 (iPhone; 9.0)\"".getBytes("UTF-8");
- mMessageWithWrongFormatTimestamp = new Message("test", 0, 0, null, messageWithWrongFormatTimestamp);
+ mMessageWithWrongFormatTimestamp = new Message("test", 0, 0, null, messageWithWrongFormatTimestamp, timestamp);
+
+ }
+
+ @Test
+ public void testExtractTimestampMillisFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.useKafkaTimestamp()).thenReturn(true);
+ RegexMessageParser regexMessageParser = new RegexMessageParser(mConfig);
+ assertEquals(timestamp, regexMessageParser.getTimestampMillis(mMessageWithMillisTimestamp));
}
@Test
@@ -65,7 +76,7 @@ public void testExtractTimestampMillis() throws Exception {
public void testExtractTimestampMillisEmpty() throws Exception {
RegexMessageParser regexMessageParser = new RegexMessageParser(mConfig);
byte emptyBytes2[] = "".getBytes();
- regexMessageParser.extractTimestampMillis(new Message("test", 0, 0, null, emptyBytes2));
+ regexMessageParser.extractTimestampMillis(new Message("test", 0, 0, null, emptyBytes2, timestamp));
}
@Test(expected=NumberFormatException.class)
diff --git a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java
old mode 100755
new mode 100644
index 95259a535..705979081
--- a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java
@@ -37,6 +37,7 @@ public class SplitByFieldMessageParserTest extends TestCase {
private Message mMessageWithTypeAndTimestamp;
private Message mMessageWithoutTimestamp;
private Message mMessageWithoutType;
+ private long timestamp;
@Override
public void setUp() throws Exception {
@@ -53,17 +54,19 @@ public void setUp() throws Exception {
Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+ timestamp = System.currentTimeMillis();
+
byte messageWithTypeAndTimestamp[] =
"{\"type\":\"event1\",\"timestamp\":\"1405911096000\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithTypeAndTimestamp = new Message("test", 0, 0, null, messageWithTypeAndTimestamp);
+ mMessageWithTypeAndTimestamp = new Message("test", 0, 0, null, messageWithTypeAndTimestamp, timestamp);
byte messageWithoutTimestamp[] =
"{\"type\":\"event2\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithoutTimestamp = new Message("test", 0, 0, null, messageWithoutTimestamp);
+ mMessageWithoutTimestamp = new Message("test", 0, 0, null, messageWithoutTimestamp, timestamp);
byte messageWithoutType[] =
"{\"timestamp\":\"1405911096123\",\"id\":0,\"guid\":\"0436b17b-e78a-4e82-accf-743bf1f0b884\",\"isActive\":false,\"balance\":\"$3,561.87\",\"picture\":\"http://placehold.it/32x32\",\"age\":23,\"eyeColor\":\"green\",\"name\":\"Mercedes Brewer\",\"gender\":\"female\",\"company\":\"MALATHION\",\"email\":\"mercedesbrewer@malathion.com\",\"phone\":\"+1 (848) 471-3000\",\"address\":\"786 Gilmore Court, Brule, Maryland, 3200\",\"about\":\"Quis nostrud Lorem deserunt esse ut reprehenderit aliqua nisi et sunt mollit est. Cupidatat incididunt minim anim eiusmod culpa elit est dolor ullamco. Aliqua cillum eiusmod ullamco nostrud Lorem sit amet Lorem aliquip esse esse velit.\\r\\n\",\"registered\":\"2014-01-14T13:07:28 +08:00\",\"latitude\":47.672012,\"longitude\":102.788623,\"tags\":[\"amet\",\"amet\",\"dolore\",\"eu\",\"qui\",\"fugiat\",\"laborum\"],\"friends\":[{\"id\":0,\"name\":\"Rebecca Hardy\"},{\"id\":1,\"name\":\"Sutton Briggs\"},{\"id\":2,\"name\":\"Dena Campos\"}],\"greeting\":\"Hello, Mercedes Brewer! You have 7 unread messages.\",\"favoriteFruit\":\"strawberry\"}".getBytes("UTF-8");
- mMessageWithoutType = new Message("test", 0, 0, null, messageWithoutType);
+ mMessageWithoutType = new Message("test", 0, 0, null, messageWithoutType, timestamp);
}
@Test
diff --git a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java
old mode 100755
new mode 100644
index 543167910..32692d7ce
--- a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java
+++ b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java
@@ -15,6 +15,7 @@
@RunWith(PowerMockRunner.class)
public class ThriftMessageParserTest extends TestCase {
private SecorConfig mConfig;
+ private long timestamp;
@Override
public void setUp() throws Exception {
@@ -25,6 +26,8 @@ public void setUp() throws Exception {
Mockito.when(TimestampedMessageParser.usingDatePrefix(mConfig)).thenReturn("dt=");
Mockito.when(TimestampedMessageParser.usingHourPrefix(mConfig)).thenReturn("hr=");
Mockito.when(TimestampedMessageParser.usingMinutePrefix(mConfig)).thenReturn("min=");
+
+ timestamp = System.currentTimeMillis();
}
private Message buildMessage(long timestamp, int timestampTwo, long timestampThree) throws Exception {
@@ -32,7 +35,21 @@ private Message buildMessage(long timestamp, int timestampTwo, long timestampThr
TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
byte[] data = serializer.serialize(thriftMessage);
- return new Message("test", 0, 0, null, data);
+ return new Message("test", 0, 0, null, data, timestamp);
+ }
+
+ @Test
+ public void testExtractTimestampFromKafkaTimestamp() throws Exception {
+ Mockito.when(mConfig.getBoolean("kafka.useTimestamp", false)).thenReturn(true);
+ Mockito.when(mConfig.getMessageTimestampName()).thenReturn("blasdjlkjasdkl");
+ Mockito.when(mConfig.getMessageTimestampId()).thenReturn(1);
+ Mockito.when(mConfig.getMessageTimestampType()).thenReturn("i64");
+ Mockito.when(mConfig.getThriftProtocolClass()).thenReturn("org.apache.thrift.protocol.TBinaryProtocol");
+
+ ThriftMessageParser parser = new ThriftMessageParser(mConfig);
+
+ assertEquals(1405970352000L, parser.extractTimestampMillis(buildMessage(1405970352L, 1, 2L)));
+ assertEquals(1405970352123L, parser.extractTimestampMillis(buildMessage(1405970352123L, 1, 2L)));
}
@Test
@@ -84,5 +101,4 @@ public void testAttemptExtractInvalidField() throws Exception {
parser.extractTimestampMillis(buildMessage(1L, 2, 3L));
}
-
}
diff --git a/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java b/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java b/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java
new file mode 100644
index 000000000..1dec6cd76
--- /dev/null
+++ b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java
@@ -0,0 +1,23 @@
+package com.pinterest.secor.timestamp;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class KafkaMessageTimestampFactoryTest {
+
+ private KafkaMessageTimestampFactory factory;
+
+ @Test
+ public void shouldReturnKafka8TimestampClassObject() {
+ factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.Kafka8MessageTimestamp");
+ Object timestamp = factory.getKafkaMessageTimestamp();
+ assertNotNull(timestamp);
+ assertEquals(timestamp.getClass(), Kafka8MessageTimestamp.class);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void shouldReturnNullForInvalidClass() {
+ factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.KafkaxxMessageTimestamp");
+ }
+}
diff --git a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/util/FileUtilTest.java b/src/test/java/com/pinterest/secor/util/FileUtilTest.java
old mode 100755
new mode 100644
diff --git a/src/test/java/com/pinterest/secor/util/ReflectionUtilTest.java b/src/test/java/com/pinterest/secor/util/ReflectionUtilTest.java
old mode 100755
new mode 100644
diff --git a/src/test/protobuf/unittest.proto b/src/test/protobuf/unittest.proto
old mode 100755
new mode 100644
diff --git a/src/test/protobuf/unittesttimestamp.proto b/src/test/protobuf/unittesttimestamp.proto
old mode 100755
new mode 100644
diff --git a/src/test/thrift/unittest.thrift b/src/test/thrift/unittest.thrift
old mode 100755
new mode 100644