Skip to content

Commit

Permalink
Merge pull request #343 from e-v-nguyen/master
Browse files Browse the repository at this point in the history
Add AZ Header to Kafka Logs
  • Loading branch information
jfzunigac authored Sep 7, 2023
2 parents 48ea723 + a7501e4 commit a8b0966
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
1 change: 1 addition & 0 deletions singer-commons/src/main/thrift/text_message.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ struct TextMessage {
2: optional string host;
3: optional string filename;
4: optional string prependEnvironmentVariables;
5: optional string availabilityZone;
}
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class TextLogFileReader implements LogFileReader {
private final TextLogMessageType textLogMessageType;

private String hostname;

private String availabilityZone;
private boolean trimTailingNewlineCharacter;

private Map<String, ByteBuffer> headers;
Expand All @@ -79,6 +79,7 @@ public TextLogFileReader(
boolean prependHostName,
boolean trimTailingNewlineCharacter,
String hostname,
String availabilityZone,
String prependFieldDelimiter,
Map<String, ByteBuffer> headers) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Expand All @@ -89,9 +90,11 @@ public TextLogFileReader(
if (headers != null) {
headers.put("hostname", SingerUtils.getByteBuf(hostname));
headers.put("file", SingerUtils.getByteBuf(path));
headers.put("availabilityZone", SingerUtils.getByteBuf(availabilityZone));
}

this.hostname = hostname;
this.availabilityZone = availabilityZone;
this.logFile = Preconditions.checkNotNull(logFile);
this.path = path;
this.numMessagesPerLogMessage = numMessagesPerLogMessage;
Expand Down Expand Up @@ -172,6 +175,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
TextMessage textMessage = new TextMessage();
textMessage.setFilename(path);
textMessage.setHost(hostname);
textMessage.setAvailabilityZone(availabilityZone);
textMessage.addToMessages(TextMessageReader.bufToString(out));
logMessage = new LogMessage(ByteBuffer.wrap(serializer.serialize(textMessage)));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public LogFileReader getLogFileReader(
readerConfig.isPrependHostname(),
readerConfig.isTrimTailingNewlineCharacter(),
SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()),
SingerSettings.getEnvironment().getLocality(),
readerConfig.getPrependFieldDelimiter(),
readerConfig.getEnvironmentVariables() != null
? new HashMap<>(readerConfig.getEnvironmentVariables())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public ThriftLogFileReader(
int readBufferSize,
int maxMessageSize,
String hostname,
String availabilityZone,
Map<String, ByteBuffer> headers) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Preconditions.checkArgument(byteOffset >= 0);
Expand All @@ -108,6 +109,7 @@ public ThriftLogFileReader(
if (headers != null) {
headers.put("hostname", SingerUtils.getByteBuf(hostname));
headers.put("file", SingerUtils.getByteBuf(path));
headers.put("availabilityZone", SingerUtils.getByteBuf(availabilityZone));
}

this.thriftReader = new ThriftReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public LogFileReader getLogFileReader(LogStream logStream, LogFile logFile, Stri
readerConfig.getReaderBufferSize(),
readerConfig.getMaxMessageSize(),
SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()),
SingerSettings.getEnvironment().getLocality(),
readerConfig.isSetEnvironmentVariables() ?
new HashMap<>(readerConfig.getEnvironmentVariables()) : null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testReadLogMessageAndPosition() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
null);
null, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i).trim(), new String(log.getLogMessage().getMessage()));
Expand All @@ -70,7 +70,7 @@ public void testReadLogMessageAndPositionWithHostname() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, false, hostname,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, false, hostname, "n/a",
delimiter, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -91,8 +91,8 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
null);
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, "n/a",
null, null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i) + dataWritten.get(i + 1).trim(),
Expand All @@ -111,19 +111,23 @@ public void testEnvironmentVariableInjection() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", null,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
new HashMap<>(ImmutableMap.of("test", ByteBuffer.wrap("value".getBytes()))));
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(3, log.getInjectedHeadersSize());
assertEquals(4, log.getInjectedHeadersSize());
assertTrue(log.getInjectedHeaders().containsKey("hostname"));
assertTrue(log.getInjectedHeaders().containsKey("file"));
assertTrue(log.getInjectedHeaders().containsKey("availabilityZone"));
assertTrue(log.getInjectedHeaders().containsKey("test"));
assertEquals(dataWritten.get(i) + dataWritten.get(i + 1).trim(),
new String(log.getLogMessage().getMessage()));
}
assertNull(reader.readLogMessageAndPosition());
reader.close();

reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", null,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testReadBadMessage() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");

// Open reader which cap the log message at 500 bytes
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500,"localhost", null);
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500,"localhost", "us-east-1a",null);
int count = 0;
for (int i = 0; i < 403; i++) {
try {
Expand Down Expand Up @@ -95,7 +95,7 @@ public void testReadLogMessageAndPosition() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");

// Open reader which cap the log message at 500 bytes
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500, "localhost", null);
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500, "localhost", "us-east-1a",null);
try {
// Seek to start offset.
reader.setByteOffset(startOffset);
Expand All @@ -111,7 +111,7 @@ public void testReadLogMessageAndPosition() throws Exception {
}

// Open reader.
reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", null);
reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", "us-east-1a",null);
List<LogMessageAndPosition> messagesRead = Lists.newArrayListWithExpectedSize(3);
try {
// Seek to start offset.
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testEnvironmentVariableInjection() throws Exception {
map.put("test", ByteBuffer.wrap("test_value".getBytes()));

// Open reader.
ThriftLogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost",
ThriftLogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", "us-east-1a",
map);
List<LogMessageAndPosition> messagesRead = Lists.newArrayListWithExpectedSize(3);
try {
Expand All @@ -154,11 +154,12 @@ public void testEnvironmentVariableInjection() throws Exception {
while (message != null) {
messagesRead.add(message);
assertNotNull(message.getInjectedHeaders());
assertEquals(1 + 2, message.getInjectedHeaders().size());
assertEquals(1 + 3, message.getInjectedHeaders().size());
assertTrue(message.getInjectedHeaders().containsKey("test"));
assertTrue(Arrays.equals("test_value".getBytes(), message.getInjectedHeaders().get("test").array()));
assertTrue(message.getInjectedHeaders().containsKey("hostname"));
assertTrue(message.getInjectedHeaders().containsKey("file"));
assertTrue(message.getInjectedHeaders().containsKey("availabilityZone"));
message = reader.readLogMessageAndPosition();
}
} finally {
Expand Down
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit a8b0966

Please sign in to comment.