Skip to content

Commit

Permalink
Merge pull request #74 from zzhhhzz/inject
Browse files Browse the repository at this point in the history
add condition check before injecting ProducerRecord's Headers
  • Loading branch information
ambud authored Aug 6, 2020
2 parents 637936d + 097ff9d commit d6585a7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.23</version>
<version>0.8.0.24</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.23</version>
<version>0.8.0.24</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.23</version>
<version>0.8.0.24</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class SingerMetrics {
public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs";
public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors";
public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected";
public static final String CHECKSUM_INJECTED = "singer.audit.num_of_checksum_injected";
public static final String AUDIT_HEADERS_METADATA_COUNT_MISMATCH = "singer.audit.headers_metadata_count_mismatch";
public static final String AUDIT_HEADERS_METADATA_COUNT_MATCH = "singer.audit.headers_metadata_count_match";
public static final String NUM_CORRUPTED_MESSAGES = "singer.audit.num_corrupted_messages";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,15 @@ public boolean checkMessageValidAndInjectHeaders(

protected void injectHeadersForProducerRecord(LogMessage msg, Headers headers) {
try {
byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "topic=" + topic, "host=" + HOSTNAME, "logName=" + msg.getLoggingAuditHeaders().getLogName(), "logStreamName=" + logName);
if (msg.isSetLoggingAuditHeaders()) {
byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
}
if (msg.isSetChecksum()) {
this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
OpenTsdbMetricConverter.incr(SingerMetrics.CHECKSUM_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
}
} catch (TException e) {
OpenTsdbMetricConverter.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS);
LOG.warn("Exception thrown while serializing headers", e);
Expand Down
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.23</version>
<version>0.8.0.24</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit d6585a7

Please sign in to comment.