Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AZ Header to Kafka Logs #343

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
1 change: 1 addition & 0 deletions singer-commons/src/main/thrift/text_message.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ struct TextMessage {
2: optional string host;
3: optional string filename;
4: optional string prependEnvironmentVariables;
5: optional string availabilityZone;
}
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class TextLogFileReader implements LogFileReader {
private final TextLogMessageType textLogMessageType;

private String hostname;

private String availabilityZone;
private boolean trimTailingNewlineCharacter;

private Map<String, ByteBuffer> headers;
Expand All @@ -79,6 +79,7 @@ public TextLogFileReader(
boolean prependHostName,
boolean trimTailingNewlineCharacter,
String hostname,
String availabilityZone,
String prependFieldDelimiter,
Map<String, ByteBuffer> headers) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Expand All @@ -89,9 +90,11 @@ public TextLogFileReader(
if (headers != null) {
headers.put("hostname", SingerUtils.getByteBuf(hostname));
headers.put("file", SingerUtils.getByteBuf(path));
headers.put("availabilityZone", SingerUtils.getByteBuf(availabilityZone));
}

this.hostname = hostname;
this.availabilityZone = availabilityZone;
this.logFile = Preconditions.checkNotNull(logFile);
this.path = path;
this.numMessagesPerLogMessage = numMessagesPerLogMessage;
Expand Down Expand Up @@ -172,6 +175,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
TextMessage textMessage = new TextMessage();
textMessage.setFilename(path);
textMessage.setHost(hostname);
textMessage.setAvailabilityZone(availabilityZone);
textMessage.addToMessages(TextMessageReader.bufToString(out));
logMessage = new LogMessage(ByteBuffer.wrap(serializer.serialize(textMessage)));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public LogFileReader getLogFileReader(
readerConfig.isPrependHostname(),
readerConfig.isTrimTailingNewlineCharacter(),
SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()),
SingerSettings.getEnvironment().getLocality(),
readerConfig.getPrependFieldDelimiter(),
readerConfig.getEnvironmentVariables() != null
? new HashMap<>(readerConfig.getEnvironmentVariables())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public ThriftLogFileReader(
int readBufferSize,
int maxMessageSize,
String hostname,
String availabilityZone,
Map<String, ByteBuffer> headers) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Preconditions.checkArgument(byteOffset >= 0);
Expand All @@ -108,6 +109,7 @@ public ThriftLogFileReader(
if (headers != null) {
headers.put("hostname", SingerUtils.getByteBuf(hostname));
headers.put("file", SingerUtils.getByteBuf(path));
headers.put("availabilityZone", SingerUtils.getByteBuf(availabilityZone));
}

this.thriftReader = new ThriftReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public LogFileReader getLogFileReader(LogStream logStream, LogFile logFile, Stri
readerConfig.getReaderBufferSize(),
readerConfig.getMaxMessageSize(),
SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()),
SingerSettings.getEnvironment().getLocality(),
readerConfig.isSetEnvironmentVariables() ?
new HashMap<>(readerConfig.getEnvironmentVariables()) : null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testReadLogMessageAndPosition() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
null);
null, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i).trim(), new String(log.getLogMessage().getMessage()));
Expand All @@ -70,7 +70,7 @@ public void testReadLogMessageAndPositionWithHostname() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, false, hostname,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, false, hostname, "n/a",
delimiter, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -91,8 +91,8 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
null);
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, "n/a",
null, null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i) + dataWritten.get(i + 1).trim(),
Expand All @@ -111,19 +111,23 @@ public void testEnvironmentVariableInjection() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", null,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
new HashMap<>(ImmutableMap.of("test", ByteBuffer.wrap("value".getBytes()))));
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(3, log.getInjectedHeadersSize());
assertEquals(4, log.getInjectedHeadersSize());
assertTrue(log.getInjectedHeaders().containsKey("hostname"));
assertTrue(log.getInjectedHeaders().containsKey("file"));
assertTrue(log.getInjectedHeaders().containsKey("availabilityZone"));
assertTrue(log.getInjectedHeaders().containsKey("test"));
assertEquals(dataWritten.get(i) + dataWritten.get(i + 1).trim(),
new String(log.getLogMessage().getMessage()));
}
assertNull(reader.readLogMessageAndPosition());
reader.close();

reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", null,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testReadBadMessage() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");

// Open reader which cap the log message at 500 bytes
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500,"localhost", null);
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500,"localhost", "us-east-1a",null);
int count = 0;
for (int i = 0; i < 403; i++) {
try {
Expand Down Expand Up @@ -95,7 +95,7 @@ public void testReadLogMessageAndPosition() throws Exception {
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");

// Open reader which cap the log message at 500 bytes
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500, "localhost", null);
LogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 500, "localhost", "us-east-1a",null);
try {
// Seek to start offset.
reader.setByteOffset(startOffset);
Expand All @@ -111,7 +111,7 @@ public void testReadLogMessageAndPosition() throws Exception {
}

// Open reader.
reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", null);
reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", "us-east-1a",null);
List<LogMessageAndPosition> messagesRead = Lists.newArrayListWithExpectedSize(3);
try {
// Seek to start offset.
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testEnvironmentVariableInjection() throws Exception {
map.put("test", ByteBuffer.wrap("test_value".getBytes()));

// Open reader.
ThriftLogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost",
ThriftLogFileReader reader = new ThriftLogFileReader(logStream, logger.getLogFile(), path, 0L, 16000, 16000, "localhost", "us-east-1a",
map);
List<LogMessageAndPosition> messagesRead = Lists.newArrayListWithExpectedSize(3);
try {
Expand All @@ -154,11 +154,12 @@ public void testEnvironmentVariableInjection() throws Exception {
while (message != null) {
messagesRead.add(message);
assertNotNull(message.getInjectedHeaders());
assertEquals(1 + 2, message.getInjectedHeaders().size());
assertEquals(1 + 3, message.getInjectedHeaders().size());
assertTrue(message.getInjectedHeaders().containsKey("test"));
assertTrue(Arrays.equals("test_value".getBytes(), message.getInjectedHeaders().get("test").array()));
assertTrue(message.getInjectedHeaders().containsKey("hostname"));
assertTrue(message.getInjectedHeaders().containsKey("file"));
assertTrue(message.getInjectedHeaders().containsKey("availabilityZone"));
message = reader.readLogMessageAndPosition();
}
} finally {
Expand Down
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.76</version>
<version>0.8.0.77</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down
Loading