Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message transformer interface #452

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ struct LogStreamReaderConfig {
3: optional TextReaderConfig textReaderConfig;
}

/**
* Transforms related configurations
*
* REGEX_BASED_MODIFIER: A regex based modifier that modifies the log message based on the regex
* and the modified message format.
**/
enum TransformType {
REGEX_BASED_MODIFIER = 0
}

struct RegexBasedModifierConfig {
// The regex to match the log message.
1: required string regex;
// The modified message format. The regex captured groups can be referenced by $1, $2, etc.
2: required string modifiedMessageFormat;
// The encoding of the log message.
3: optional string encoding = "UTF-8";
// Append a newline to the end of the modified message.
4: optional bool appendNewLine = true;
}

struct MessageTransformerConfig {
1: required TransformType type;
2: optional RegexBasedModifierConfig regexBasedModifierConfig;
}

/**
* KAFKA: kafka writer that distributes messages to topic partitions based on
* partititioner. The default partitioner does random distribution among
Expand Down Expand Up @@ -250,6 +276,10 @@ struct SingerLogConfig {
* this log will be skipped directly when draining is enabled
*/
14: optional bool skipDraining = false;
/**
* Configuration to transform log message
*/
15: optional MessageTransformerConfig messageTransformerConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public class SingerConfigDef {

public static final String TEXT_READER_FILTER_MESSAGE_REGEX = "filterMessageRegex";

public static final String RBM_REGEX = "regex";
public static final String RBM_MODIFIED_MESSAGE_FORMAT = "modifiedMessageFormat";
public static final String RBM_ENCODING = "encoding";
public static final String RBM_APPEND_NEW_LINE = "appendNewLine";

public static final String BUCKET = "bucket";
public static final String KEY_FORMAT = "keyFormat";
public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class SingerMetrics {
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";

public static final String SINGER_TRANSFORMER = SINGER_PREIX + "transformer.";
public static final String REGEX_BASED_MODIFIER = SINGER_TRANSFORMER + "regex_based_modifier.";

public static final String KUBE_PREFIX = SINGER_PREIX + "kube.";
public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error";
public static final String KUBE_SERVICE_ERROR = KUBE_PREFIX + "kubeservice_thread_error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.pinterest.singer.reader.DefaultLogStreamReader;
import com.pinterest.singer.reader.TextLogFileReaderFactory;
import com.pinterest.singer.reader.ThriftLogFileReaderFactory;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.NoOpWriteConfig;
import com.pinterest.singer.thrift.configuration.S3WriterConfig;
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
Expand Down Expand Up @@ -280,7 +281,8 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon
LogStream logStream)
throws ConfigurationException, LogStreamReaderException, LogStreamWriterException {
LogStreamReader reader =
createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig());
createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig(),
singerLogConfig.getMessageTransformerConfig());
LogStreamWriter writer =
createLogStreamWriter(logStream, singerLogConfig.getLogStreamWriterConfig());

Expand Down Expand Up @@ -326,9 +328,11 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon
* @throws Exception when fail to create the reader for the LogStream.
*/
protected LogStreamReader createLogStreamReader(LogStream logStream,
LogStreamReaderConfig readerConfig)
LogStreamReaderConfig readerConfig,
MessageTransformerConfig messageTransformerConfig)
throws LogStreamReaderException {
switch (readerConfig.getType()) {
// No transforms available for thrift logs yet
case THRIFT:
ThriftReaderConfig thriftReaderConfig = readerConfig.getThriftReaderConfig();
return new DefaultLogStreamReader(logStream,
Expand All @@ -337,7 +341,7 @@ protected LogStreamReader createLogStreamReader(LogStream logStream,
case TEXT:
TextReaderConfig textReaderConfig = readerConfig.getTextReaderConfig();
return new DefaultLogStreamReader(logStream,
new TextLogFileReaderFactory(textReaderConfig));
new TextLogFileReaderFactory(textReaderConfig, messageTransformerConfig));

default:
throw new LogStreamReaderException("Unsupported log reader type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.pinterest.singer.thrift.LogMessageAndPosition;
import com.pinterest.singer.thrift.LogPosition;
import com.pinterest.singer.thrift.TextMessage;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.TextLogMessageType;
import com.pinterest.singer.transforms.MessageTransformer;
import com.pinterest.singer.transforms.MessageTransformerFactory;
import com.pinterest.singer.utils.SingerUtils;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -57,6 +60,7 @@ public class TextLogFileReader implements LogFileReader {
private final TSerializer serializer;
private final TextMessageReader textMessageReader;
private final Pattern filterMessageRegex;
private final MessageTransformer messageTransformer;
private ByteBuffer maxBuffer;

// The text log message format, can be TextMessage, or String;
Expand Down Expand Up @@ -85,7 +89,8 @@ public TextLogFileReader(
String hostname,
String availabilityZone,
String prependFieldDelimiter,
Map<String, ByteBuffer> headers) throws Exception {
Map<String, ByteBuffer> headers,
MessageTransformerConfig messageTransformerConfig) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Preconditions.checkArgument(byteOffset >= 0);

Expand Down Expand Up @@ -114,6 +119,9 @@ public TextLogFileReader(
this.maxBuffer = ByteBuffer.allocate(capacity);
this.trimTailingNewlineCharacter = trimTailingNewlineCharacter;
this.filterMessageRegex = filterMessageRegex;
this.messageTransformer =
messageTransformerConfig != null ? MessageTransformerFactory.getTransformer(
messageTransformerConfig, logStream) : null;

// Make sure the path is still associated with the LogFile.
// This can happen when the path is reused for another LogFile during log
Expand Down Expand Up @@ -152,6 +160,12 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
TextMessageReader.bufToString(message)).matches()) {
skipLogMessage = true;
}

// Transform message if messageTransformer is set.
if (messageTransformer != null) {
message = (ByteBuffer) messageTransformer.transform(message);
}

String prependStr = "";
if (prependTimestamp) {
prependStr += System.currentTimeMillis() + prependFieldDelimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.TextReaderConfig;
import com.pinterest.singer.utils.LogFileUtils;
import com.pinterest.singer.utils.SingerUtils;
Expand All @@ -37,9 +38,11 @@ public class TextLogFileReaderFactory implements LogFileReaderFactory {

private static final Logger LOG = LoggerFactory.getLogger(TextLogFileReaderFactory.class);
private final TextReaderConfig readerConfig;
private final MessageTransformerConfig messageTransformerConfig;

public TextLogFileReaderFactory(TextReaderConfig readerConfig) {
public TextLogFileReaderFactory(TextReaderConfig readerConfig, MessageTransformerConfig messageTransformerConfig) {
this.readerConfig = Preconditions.checkNotNull(readerConfig);
this.messageTransformerConfig = messageTransformerConfig;
}

@SuppressWarnings("resource")
Expand Down Expand Up @@ -70,7 +73,8 @@ public LogFileReader getLogFileReader(
readerConfig.getPrependFieldDelimiter(),
readerConfig.getEnvironmentVariables() != null
? new HashMap<>(readerConfig.getEnvironmentVariables())
: null);
: null,
messageTransformerConfig);
} catch (LogFileReaderException e) {
LOG.warn("Exception in getLogFileReader", e);
long inode = logFile.getInode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.pinterest.singer.transforms;

/**
* Represents a message transformer that can be used by LogFileReaders or other components to
* transform messages.
*/
public interface MessageTransformer<T> {

/**
* Transform the message.
*
* @param message the message to transform.
* @return the transformed message.
*/
public T transform(T message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.pinterest.singer.transforms;

import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig;
import com.pinterest.singer.thrift.configuration.TransformType;

import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageTransformerFactory {

private static final Logger LOG = LoggerFactory.getLogger(MessageTransformerFactory.class);

public static MessageTransformer getTransformer(MessageTransformerConfig transformerConfig, LogStream logStream)
throws IllegalArgumentException {
if (transformerConfig == null) {
return null;
}
TransformType transformType = transformerConfig.getType();
MessageTransformer transformer;
try {
switch (transformType) {
case REGEX_BASED_MODIFIER:
RegexBasedModifierConfig regexBasedModifierConfig = transformerConfig.getRegexBasedModifierConfig();
transformer = new RegexBasedModifier(regexBasedModifierConfig, logStream);
break;
default:
transformer = null;
}
} catch (ConfigurationException e) {
LOG.warn("Could not initialize transformer {} for log stream {}, it will be disabled", transformType,
logStream.getSingerLog().getSingerLogConfig().getName(), e);
transformer = null;
}
return transformer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.pinterest.singer.transforms;

import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerMetrics;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig;

import com.google.common.base.Preconditions;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

/**
* A message transformer that modifies messages based on a regex pattern.
* This transformer uses a regex pattern to match a message and then replaces the matched groups in
* the message with the specified format.
*/
public class RegexBasedModifier implements MessageTransformer<ByteBuffer> {
private static final Logger LOG = LoggerFactory.getLogger(RegexBasedModifier.class);
private final LogStream logStream;
private final String logName;
private final Pattern regex;
private final Charset encoding;
private final String messageFormat;
private final boolean appendNewline;

public RegexBasedModifier(RegexBasedModifierConfig config, LogStream logStream)
throws ConfigurationException {
this.messageFormat = Preconditions.checkNotNull(config.getModifiedMessageFormat());
this.encoding = Charset.forName(config.getEncoding());
this.logStream = logStream;
this.logName = logStream.getSingerLog().getSingerLogConfig().getName();
this.appendNewline = config.isAppendNewLine();
try {
this.regex = Pattern.compile(config.getRegex());
} catch (PatternSyntaxException e) {
throw new ConfigurationException("Invalid modifier regex ", e);
}
}

/**
* Transforms a message based on the regex pattern and message format.
* If the message does not match the regex pattern or an exception occurs, the original message
* is returned so that we don't block the log stream. If the message matches the regex pattern,
* the matched groups are replaced in the message format, each group should be referenced in the
* modified message format using $1, $2, etc.
*
* @param message the message to transform.
* @return the transformed message or the original message if no transformation occurred.
*/
@Override
public ByteBuffer transform(ByteBuffer message) {
String messageString = bufferToString(message);
Matcher matcher = regex.matcher(messageString);
if (!matcher.find()) {
LOG.debug("[RegexParser] Message " + messageString + " did not match regex: " + regex);
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "no_message_match",
"logName=" + "file=" + logStream.getFileNamePrefix());
return message;
}
try {
Map<String, String> groupMap = new HashMap<>();
for (int i = 1; i <= matcher.groupCount(); i++) {
groupMap.put("$" + i, matcher.group(i));
LOG.debug("Group " + i + ": " + matcher.group(i));
}
StringBuilder result = new StringBuilder(messageFormat);
groupMap.forEach((groupIndex, value) -> {
int start;
while ((start = result.indexOf(groupIndex)) != -1) {
result.replace(start, start + groupIndex.length(), value);
}
});
if (appendNewline) result.append("\n");
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "success",
"logName=" + logName, "file=" + logStream.getFileNamePrefix());
return ByteBuffer.wrap(result.toString().getBytes(encoding));
} catch (Exception e) {
LOG.warn("Failed to transform message in log stream {}, returning raw message.",
logName, e);
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "failures",
"logName=" + logName, "file=" + logStream.getFileNamePrefix());
return message;
}
}

/**
* Converts a ByteBuffer to a String using the specified encoding.
*
* @param message
* @return the String representation of the ByteBuffer.
*/
private String bufferToString(ByteBuffer message) {
String string = new String(message.array(), 0, message.limit(), encoding);
return string;
}
}
Loading
Loading