Skip to content

Commit

Permalink
update secor version 0.23 to 0.24
Browse files Browse the repository at this point in the history
  • Loading branch information
Jitendra Singh Sankhwar committed Sep 4, 2017
1 parent 5256829 commit df8c726
Show file tree
Hide file tree
Showing 149 changed files with 488 additions and 183 deletions.
Empty file modified DESIGN.md
100755 → 100644
Empty file.
Empty file modified Dockerfile
100755 → 100644
Empty file.
Empty file modified LICENSE
100755 → 100644
Empty file.
Empty file modified Makefile
100755 → 100644
Empty file.
12 changes: 7 additions & 5 deletions README.md
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ Edit `src/main/config/*.properties` files to specify parameters describing the e

##### Create and install jars
```sh
# By default this will install the "release" (Kafka 0.8 profile)
# By default this will install the "release" (Kafka 0.10 profile)
mvn package
mkdir ${SECOR_INSTALL_DIR} # directory to place Secor binaries in.
tar -zxvf target/secor-0.1-SNAPSHOT-bin.tar.gz -C ${SECOR_INSTALL_DIR}

# To use the Kafka 0.10 client you should use the kafka-0.10-dev profile
mvn -Pkafka-0.10-dev package
# To use the Kafka 0.8 client you should use the kafka-0.8-dev profile
mvn -Pkafka-0.8-dev package
```

##### Run tests (optional)
Expand All @@ -62,7 +62,9 @@ One of the convenience features of Secor is the ability to group messages and sa

- **Thrift date parser**: parser that extracts timestamps from thrift messages and groups the output based on the date (at a day granularity). To keep things simple, this parser assumes that the timestamp is carried in the first field (id 1) of the thrift message schema by default. The field id can be changed by setting ```message.timestamp.id``` as long as the field is at the top level of the thrift object (i.e. it is not in a nested structure). The timestamp may be expressed either in seconds or milliseconds, or nanoseconds since the epoch. The output goes to date-partitioned paths (e.g., ```s3n://bucket/topic/dt=2014-05-01```, ```s3n://bucket/topic/dt=2014-05-02```). Date partitioning is particularly convenient if the output is to be consumed by ETL tools such as [Hive]. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties). Note the ```message.timestamp.name``` property has no effect on the thrift parsing, which is determined by the field id.

- **JSON date parser**: parser that extracts timestamps from JSON messages and groups the output based on the date, similar to the Thrift parser above. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.JsonMessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.
- **JSON timestamp parser**: parser that extracts UNIX timestamps from JSON messages and groups the output based on the date, similar to the Thrift parser above. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.JsonMessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.

- **JSON ISO 8601 date parser**: Assumes your timestamp field uses ISO 8601. To use this parser, start Secor with properties file [secor.prod.partition.properties](src/main/config/secor.prod.partition.properties) and set `secor.message.parser.class=com.pinterest.secor.parser.Iso8601MessageParser`. You may override the field used to extract the timestamp by setting the "message.timestamp.name" property.

- **MessagePack date parser**: parser that extracts timestamps from MessagePack messages and groups the output based on the date, similar to the Thrift and JSON parser. To use this parser, set `secor.message.parser.class=com.pinterest.secor.parser.MessagePackParser`. Like the Thrift parser, the timestamp may be expressed either in seconds or milliseconds, or nanoseconds since the epoch and respects the "message.timestamp.name" property.

Expand Down Expand Up @@ -176,7 +178,7 @@ Secor is distributed under [Apache License, Version 2.0](http://www.apache.org/l
* [Rakuten](http://techblog.rakuten.co.jp/)
* [Appsflyer](https://www.appsflyer.com)
* [Wego](https://www.wego.com)
* [GO_JEK](http://gojekengineering.com/)
* [GO-JEK](http://gojekengineering.com/)

## Help

Expand Down
Empty file modified containers/trusty/Dockerfile
100755 → 100644
Empty file.
Empty file modified containers/xenial/Dockerfile
100755 → 100644
Empty file.
28 changes: 25 additions & 3 deletions pom.xml
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.pinterest</groupId>
<artifactId>secor</artifactId>
<version>0.23</version>
<version>0.24-SNAPSHOT</version>
<packaging>jar</packaging>
<name>secor</name>
<description>Kafka to s3/gs/swift logs exporter</description>
Expand Down Expand Up @@ -100,12 +100,12 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.68</version>
<version>1.11.160</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.10.68</version>
<version>1.11.160</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
Expand Down Expand Up @@ -536,6 +536,14 @@
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

Expand Down Expand Up @@ -602,6 +610,20 @@
</profile>
<profile>
<id>kafka-0.8-dev</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<excludes>
<exclude>com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Empty file modified src/main/assembly/secor.xml
100755 → 100644
Empty file.
Empty file modified src/main/config/kafka.test.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/log4j.dev.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/log4j.docker.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/log4j.prod.properties
100755 → 100644
Empty file.
12 changes: 12 additions & 0 deletions src/main/config/secor.common.properties
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ kafka.dual.commit.enabled=true
# Possible values: "zookeeper" to read offset from zookeeper or "kafka" to read offset from kafka consumer topic
kafka.offsets.storage=zookeeper

# Parameter which tells whether to extract Kafka message timestamp. This value is to be chose in case of 0.10.x kafka brokers.
# Default value is false. Also specify `kafka.message.timestamp.className` as `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`,
# in case you are enabling this parameter as `true`.
kafka.useTimestamp=false

# Classname for the timestamp field you want to use. Default is `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`
# for 0.10 build profile. Basically, it will be `Kafka8MessageTimestamp` for 0.8 kafka and `Kafka10MessageTimestamp`
# for 0.10 kafka. Fully classified names are `com.pinterest.secor.timestamp.Kafka8MessageTimestamp` and
# `com.pinterest.secor.timestamp.Kafka10MessageTimestamp`.
kafka.message.timestamp.className=com.pinterest.secor.timestamp.Kafka10MessageTimestamp

# Secor generation is a version that should be incremented during non-backwards-compatible
# Secor releases. Generation number is one of the components of generated log file names.
# Generation number makes sure that outputs of different Secor versions are isolated.
Expand Down Expand Up @@ -404,3 +415,4 @@ parquet.validation=false
secor.orc.message.schema.*=struct<a:int\,b:int\,c:struct<d:int\,e:string>\,f:array<string>\,g:int>
# Below config used for defining ORC schema provider class name. User can use the custom implementation for orc schema provider
secor.orc.schema.provider=com.pinterest.secor.util.orc.schema.DefaultORCSchemaProvider

Empty file modified src/main/config/secor.dev.azure.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.dev.backup.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.dev.gs.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.dev.hr.partition.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.dev.partition.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.dev.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.prod.backup.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.prod.partition.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.prod.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.test.backup.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.test.partition.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.test.perf.backup.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/secor.test.properties
100755 → 100644
Empty file.
Empty file modified src/main/config/zookeeper.test.properties
100755 → 100644
Empty file.
Empty file modified src/main/java/com/pinterest/secor/common/FileRegistry.java
100755 → 100644
Empty file.
9 changes: 8 additions & 1 deletion src/main/java/com/pinterest/secor/common/KafkaClient.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.net.HostAndPort;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
Expand Down Expand Up @@ -50,10 +51,12 @@ public class KafkaClient {

private SecorConfig mConfig;
private ZookeeperConnector mZookeeperConnector;
private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory;

public KafkaClient(SecorConfig config) {
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}

private HostAndPort findLeader(TopicPartition topicPartition) {
Expand Down Expand Up @@ -141,8 +144,12 @@ private Message getMessage(TopicPartition topicPartition, long offset,
payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);
}
long timestamp = (mConfig.useKafkaTimestamp())
? mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(messageAndOffset)
: 0l;

return new Message(topicPartition.getTopic(), topicPartition.getPartition(),
messageAndOffset.offset(), keyBytes, payloadBytes);
messageAndOffset.offset(), keyBytes, payloadBytes, timestamp);
}

private SimpleConsumer createConsumer(String host, int port, String clientName) {
Expand Down
Empty file modified src/main/java/com/pinterest/secor/common/LogFilePath.java
100755 → 100644
Empty file.
Empty file modified src/main/java/com/pinterest/secor/common/OffsetTracker.java
100755 → 100644
Empty file.
Empty file.
10 changes: 9 additions & 1 deletion src/main/java/com/pinterest/secor/common/SecorConfig.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ public String getOffsetsStorage() {
return getString("kafka.offsets.storage");
}

public boolean useKafkaTimestamp() {
return getBoolean("kafka.useTimestamp", false);
}

public String getKafkaMessageTimestampClass() {
return getString("kafka.message.timestamp.className");
}

public int getGeneration() {
return getInt("secor.generation");
}
Expand Down Expand Up @@ -567,7 +575,7 @@ public String getThriftProtocolClass() {
}

public String getMetricsCollectorClass() {
return mProperties.getString("secor.monitoring.metrics.collector.class");
return getString("secor.monitoring.metrics.collector.class");
}

/**
Expand Down
Empty file modified src/main/java/com/pinterest/secor/common/TopicPartition.java
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file modified src/main/java/com/pinterest/secor/consumer/Consumer.java
100755 → 100644
Empty file.
Empty file modified src/main/java/com/pinterest/secor/io/FileReader.java
100755 → 100644
Empty file.
Empty file.
Empty file modified src/main/java/com/pinterest/secor/io/FileWriter.java
100755 → 100644
Empty file.
28 changes: 25 additions & 3 deletions src/main/java/com/pinterest/secor/io/KeyValue.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,39 @@
/**
* Generic Object used to read next message from various file reader
* implementations
*
*
* @author Praveen Murugesan ([email protected])
*
*/
public class KeyValue {

private final long mOffset;
private final byte[] mKafkaKey;
private final byte[] mValue;
private final long mTimestamp;

// constructor
public KeyValue(long offset, byte[] value) {
this.mOffset = offset;
this.mKafkaKey = new byte[0];
this.mValue = value;
this.mTimestamp = -1;
}

// constructor
public KeyValue(long offset, byte[] kafkaKey, byte[] value) {
this.mOffset = offset;
this.mKafkaKey = kafkaKey;
this.mValue = value;
this.mTimestamp = -1;
}

// constructor
public KeyValue(long offset, byte[] kafkaKey, byte[] value, long timestamp) {
this.mOffset = offset;
this.mKafkaKey = kafkaKey;
this.mValue = value;
this.mTimestamp = timestamp;
}

public long getOffset() {
Expand All @@ -50,9 +61,20 @@ public long getOffset() {
public byte[] getKafkaKey() {
return this.mKafkaKey;
}

public byte[] getValue() {
return this.mValue;
}

public long getTimestamp() {
return this.mTimestamp;
}

public boolean hasKafkaKey() {
return this.mKafkaKey != null && this.mKafkaKey.length != 0;
}

public boolean hasTimestamp(){
return this.mTimestamp != -1;
}
}
Empty file.
Empty file.
Empty file.
42 changes: 34 additions & 8 deletions src/main/java/com/pinterest/secor/io/impl/MessagePackSequenceFileReaderWriterFactory.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public class MessagePackSequenceFileReaderWriterFactory implements FileReaderWriterFactory {
private static final int KAFKA_MESSAGE_OFFSET = 1;
private static final int KAFKA_HASH_KEY = 2;
private static final int KAFKA_MESSAGE_TIMESTAMP = 3;
private static final byte[] EMPTY_BYTES = new byte[0];

@Override
Expand Down Expand Up @@ -76,13 +77,17 @@ public KeyValue next() throws IOException {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(mKey.getBytes());
int mapSize = unpacker.unpackMapHeader();
long offset = 0;
long timestamp = -1;
byte[] keyBytes = EMPTY_BYTES;
for (int i = 0; i < mapSize; i++) {
int key = unpacker.unpackInt();
switch (key) {
case KAFKA_MESSAGE_OFFSET:
offset = unpacker.unpackLong();
break;
case KAFKA_MESSAGE_TIMESTAMP:
timestamp = unpacker.unpackLong();
break;
case KAFKA_HASH_KEY:
int keySize = unpacker.unpackBinaryHeader();
keyBytes = new byte[keySize];
Expand All @@ -91,7 +96,7 @@ public KeyValue next() throws IOException {
}
}
unpacker.close();
return new KeyValue(offset, keyBytes, Arrays.copyOfRange(mValue.getBytes(), 0, mValue.getLength()));
return new KeyValue(offset, keyBytes, Arrays.copyOfRange(mValue.getBytes(), 0, mValue.getLength()), timestamp);
} else {
return null;
}
Expand Down Expand Up @@ -131,25 +136,34 @@ public long getLength() throws IOException {

@Override
public void write(KeyValue keyValue) throws IOException {
byte[] kafkaKey = keyValue.getKafkaKey();
byte[] kafkaKey = keyValue.hasKafkaKey() ? keyValue.getKafkaKey() : new byte[0];
long timestamp = keyValue.getTimestamp();
final int timestampLength = (keyValue.hasTimestamp()) ? 10 : 0;
// output size estimate
// 1 - map header
// 1 - message pack key
// 9 - max kafka offset
// 1 - message pack key
// 9 - kafka timestamp
// 1 - message pack key
// 5 - max (sane) kafka key size
// N - size of kafka key
// = 17 + N
ByteArrayOutputStream out = new ByteArrayOutputStream(17 + kafkaKey.length);
// = 27 + N
ByteArrayOutputStream out = new ByteArrayOutputStream(17 + timestampLength + kafkaKey.length);
MessagePacker packer = MessagePack.newDefaultPacker(out)
.packMapHeader((kafkaKey.length == 0) ? 1 : 2)
.packMapHeader(numberOfFieldsMappedInHeader(keyValue))
.packInt(KAFKA_MESSAGE_OFFSET)
.packLong(keyValue.getOffset());
if (kafkaKey.length != 0) {

if (keyValue.hasTimestamp())
packer.packInt(KAFKA_MESSAGE_TIMESTAMP)
.packLong(timestamp);

if (keyValue.hasKafkaKey())
packer.packInt(KAFKA_HASH_KEY)
.packBinaryHeader(kafkaKey.length)
.writePayload(kafkaKey);
}

packer.close();
byte[] outBytes = out.toByteArray();
this.mKey.set(outBytes, 0, outBytes.length);
Expand All @@ -161,5 +175,17 @@ public void write(KeyValue keyValue) throws IOException {
public void close() throws IOException {
this.mWriter.close();
}

private int numberOfFieldsMappedInHeader(KeyValue keyValue) {
int fields = 1;

if (keyValue.hasKafkaKey())
fields++;

if (keyValue.hasTimestamp())
fields++;

return fields;
}
}
}
}
Empty file.
Empty file.
Empty file.
Empty file modified src/main/java/com/pinterest/secor/main/ConsumerMain.java
100755 → 100644
Empty file.
Empty file modified src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java
100755 → 100644
Empty file.
Empty file modified src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java
100755 → 100644
Empty file.
Empty file.
Empty file modified src/main/java/com/pinterest/secor/main/ProgressMonitorMain.java
100755 → 100644
Empty file.
Empty file.
Empty file modified src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java
100755 → 100644
Empty file.
11 changes: 9 additions & 2 deletions src/main/java/com/pinterest/secor/message/Message.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ public class Message {
private long mOffset;
private byte[] mKafkaKey;
private byte[] mPayload;
private long mTimestamp;

protected String fieldsToString() {
return "topic='" + mTopic + '\'' +
", kafkaPartition=" + mKafkaPartition +
", offset=" + mOffset +
", kafkaKey=" + new String(mKafkaKey) +
", payload=" + new String(mPayload);
", payload=" + new String(mPayload) +
", timestamp=" + mTimestamp;
}

@Override
public String toString() {
return "Message{" + fieldsToString() + '}';
}

public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload) {
public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload, long timestamp) {
mTopic = topic;
mKafkaPartition = kafkaPartition;
mOffset = offset;
Expand All @@ -60,6 +62,7 @@ public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, b
if (mPayload == null) {
mPayload = EMPTY_BYTES;
}
mTimestamp = timestamp;
}

public String getTopic() {
Expand All @@ -82,6 +85,10 @@ public byte[] getPayload() {
return mPayload;
}

public long getTimestamp() {
return mTimestamp;
}

public void write(OutputStream output) throws IOException {
output.write(mPayload);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/pinterest/secor/message/ParsedMessage.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public String toString() {
}

public ParsedMessage(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload,
String[] mPartitions) {
super(topic, kafkaPartition, offset, kafkaKey, payload);
String[] mPartitions, long timestamp) {
super(topic, kafkaPartition, offset, kafkaKey, payload, timestamp);
this.mPartitions = mPartitions;
}

Expand Down
Empty file.
Empty file.
Loading

0 comments on commit df8c726

Please sign in to comment.