Skip to content

Commit

Permalink
Merge pull request #452 from jfzunigac/feature/transforms
Browse files Browse the repository at this point in the history
Add message transformer interface
  • Loading branch information
jfzunigac authored Nov 19, 2024
2 parents 6ec6e08 + bb1fdf3 commit 650880f
Show file tree
Hide file tree
Showing 14 changed files with 406 additions and 14 deletions.
30 changes: 30 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ struct LogStreamReaderConfig {
3: optional TextReaderConfig textReaderConfig;
}

/**
* Transforms related configurations
*
* REGEX_BASED_MODIFIER: A regex based modifier that modifies the log message based on the regex
* and the modified message format.
**/
enum TransformType {
REGEX_BASED_MODIFIER = 0
}

struct RegexBasedModifierConfig {
// The regex to match the log message.
1: required string regex;
// The modified message format. The regex captured groups can be referenced by $1, $2, etc.
2: required string modifiedMessageFormat;
// The encoding of the log message.
3: optional string encoding = "UTF-8";
// Append a newline to the end of the modified message.
4: optional bool appendNewLine = true;
}

struct MessageTransformerConfig {
1: required TransformType type;
2: optional RegexBasedModifierConfig regexBasedModifierConfig;
}

/**
* KAFKA: kafka writer that distributes messages to topic partitions based on
* partititioner. The default partitioner does random distribution among
Expand Down Expand Up @@ -250,6 +276,10 @@ struct SingerLogConfig {
* this log will be skipped directly when draining is enabled
*/
14: optional bool skipDraining = false;
/**
* Configuration to transform log message
*/
15: optional MessageTransformerConfig messageTransformerConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public class SingerConfigDef {

public static final String TEXT_READER_FILTER_MESSAGE_REGEX = "filterMessageRegex";

public static final String RBM_REGEX = "regex";
public static final String RBM_MODIFIED_MESSAGE_FORMAT = "modifiedMessageFormat";
public static final String RBM_ENCODING = "encoding";
public static final String RBM_APPEND_NEW_LINE = "appendNewLine";

public static final String BUCKET = "bucket";
public static final String KEY_FORMAT = "keyFormat";
public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class SingerMetrics {
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";

public static final String SINGER_TRANSFORMER = SINGER_PREIX + "transformer.";
public static final String REGEX_BASED_MODIFIER = SINGER_TRANSFORMER + "regex_based_modifier.";

public static final String KUBE_PREFIX = SINGER_PREIX + "kube.";
public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error";
public static final String KUBE_SERVICE_ERROR = KUBE_PREFIX + "kubeservice_thread_error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.pinterest.singer.reader.DefaultLogStreamReader;
import com.pinterest.singer.reader.TextLogFileReaderFactory;
import com.pinterest.singer.reader.ThriftLogFileReaderFactory;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.NoOpWriteConfig;
import com.pinterest.singer.thrift.configuration.S3WriterConfig;
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
Expand Down Expand Up @@ -280,7 +281,8 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon
LogStream logStream)
throws ConfigurationException, LogStreamReaderException, LogStreamWriterException {
LogStreamReader reader =
createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig());
createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig(),
singerLogConfig.getMessageTransformerConfig());
LogStreamWriter writer =
createLogStreamWriter(logStream, singerLogConfig.getLogStreamWriterConfig());

Expand Down Expand Up @@ -326,9 +328,11 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon
* @throws Exception when fail to create the reader for the LogStream.
*/
protected LogStreamReader createLogStreamReader(LogStream logStream,
LogStreamReaderConfig readerConfig)
LogStreamReaderConfig readerConfig,
MessageTransformerConfig messageTransformerConfig)
throws LogStreamReaderException {
switch (readerConfig.getType()) {
// No transforms available for thrift logs yet
case THRIFT:
ThriftReaderConfig thriftReaderConfig = readerConfig.getThriftReaderConfig();
return new DefaultLogStreamReader(logStream,
Expand All @@ -337,7 +341,7 @@ protected LogStreamReader createLogStreamReader(LogStream logStream,
case TEXT:
TextReaderConfig textReaderConfig = readerConfig.getTextReaderConfig();
return new DefaultLogStreamReader(logStream,
new TextLogFileReaderFactory(textReaderConfig));
new TextLogFileReaderFactory(textReaderConfig, messageTransformerConfig));

default:
throw new LogStreamReaderException("Unsupported log reader type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.pinterest.singer.thrift.LogMessageAndPosition;
import com.pinterest.singer.thrift.LogPosition;
import com.pinterest.singer.thrift.TextMessage;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.TextLogMessageType;
import com.pinterest.singer.transforms.MessageTransformer;
import com.pinterest.singer.transforms.MessageTransformerFactory;
import com.pinterest.singer.utils.SingerUtils;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -57,6 +60,7 @@ public class TextLogFileReader implements LogFileReader {
private final TSerializer serializer;
private final TextMessageReader textMessageReader;
private final Pattern filterMessageRegex;
private final MessageTransformer messageTransformer;
private ByteBuffer maxBuffer;

// The text log message format, can be TextMessage, or String;
Expand Down Expand Up @@ -85,7 +89,8 @@ public TextLogFileReader(
String hostname,
String availabilityZone,
String prependFieldDelimiter,
Map<String, ByteBuffer> headers) throws Exception {
Map<String, ByteBuffer> headers,
MessageTransformerConfig messageTransformerConfig) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Preconditions.checkArgument(byteOffset >= 0);

Expand Down Expand Up @@ -114,6 +119,9 @@ public TextLogFileReader(
this.maxBuffer = ByteBuffer.allocate(capacity);
this.trimTailingNewlineCharacter = trimTailingNewlineCharacter;
this.filterMessageRegex = filterMessageRegex;
this.messageTransformer =
messageTransformerConfig != null ? MessageTransformerFactory.getTransformer(
messageTransformerConfig, logStream) : null;

// Make sure the path is still associated with the LogFile.
// This can happen when the path is reused for another LogFile during log
Expand Down Expand Up @@ -152,6 +160,12 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
TextMessageReader.bufToString(message)).matches()) {
skipLogMessage = true;
}

// Transform message if messageTransformer is set.
if (messageTransformer != null) {
message = (ByteBuffer) messageTransformer.transform(message);
}

String prependStr = "";
if (prependTimestamp) {
prependStr += System.currentTimeMillis() + prependFieldDelimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.TextReaderConfig;
import com.pinterest.singer.utils.LogFileUtils;
import com.pinterest.singer.utils.SingerUtils;
Expand All @@ -37,9 +38,11 @@ public class TextLogFileReaderFactory implements LogFileReaderFactory {

private static final Logger LOG = LoggerFactory.getLogger(TextLogFileReaderFactory.class);
private final TextReaderConfig readerConfig;
private final MessageTransformerConfig messageTransformerConfig;

public TextLogFileReaderFactory(TextReaderConfig readerConfig) {
public TextLogFileReaderFactory(TextReaderConfig readerConfig, MessageTransformerConfig messageTransformerConfig) {
this.readerConfig = Preconditions.checkNotNull(readerConfig);
this.messageTransformerConfig = messageTransformerConfig;
}

@SuppressWarnings("resource")
Expand Down Expand Up @@ -70,7 +73,8 @@ public LogFileReader getLogFileReader(
readerConfig.getPrependFieldDelimiter(),
readerConfig.getEnvironmentVariables() != null
? new HashMap<>(readerConfig.getEnvironmentVariables())
: null);
: null,
messageTransformerConfig);
} catch (LogFileReaderException e) {
LOG.warn("Exception in getLogFileReader", e);
long inode = logFile.getInode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.pinterest.singer.transforms;

/**
* Represents a message transformer that can be used by LogFileReaders or other components to
* transform messages.
*/
public interface MessageTransformer<T> {

/**
* Transform the message.
*
* @param message the message to transform.
* @return the transformed message.
*/
public T transform(T message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.pinterest.singer.transforms;

import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.thrift.configuration.MessageTransformerConfig;
import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig;
import com.pinterest.singer.thrift.configuration.TransformType;

import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageTransformerFactory {

private static final Logger LOG = LoggerFactory.getLogger(MessageTransformerFactory.class);

public static MessageTransformer getTransformer(MessageTransformerConfig transformerConfig, LogStream logStream)
throws IllegalArgumentException {
if (transformerConfig == null) {
return null;
}
TransformType transformType = transformerConfig.getType();
MessageTransformer transformer;
try {
switch (transformType) {
case REGEX_BASED_MODIFIER:
RegexBasedModifierConfig regexBasedModifierConfig = transformerConfig.getRegexBasedModifierConfig();
transformer = new RegexBasedModifier(regexBasedModifierConfig, logStream);
break;
default:
transformer = null;
}
} catch (ConfigurationException e) {
LOG.warn("Could not initialize transformer {} for log stream {}, it will be disabled", transformType,
logStream.getSingerLog().getSingerLogConfig().getName(), e);
transformer = null;
}
return transformer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.pinterest.singer.transforms;

import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerMetrics;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig;

import com.google.common.base.Preconditions;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

/**
* A message transformer that modifies messages based on a regex pattern.
* This transformer uses a regex pattern to match a message and then replaces the matched groups in
* the message with the specified format.
*/
public class RegexBasedModifier implements MessageTransformer<ByteBuffer> {
private static final Logger LOG = LoggerFactory.getLogger(RegexBasedModifier.class);
private final LogStream logStream;
private final String logName;
private final Pattern regex;
private final Charset encoding;
private final String messageFormat;
private final boolean appendNewline;

public RegexBasedModifier(RegexBasedModifierConfig config, LogStream logStream)
throws ConfigurationException {
this.messageFormat = Preconditions.checkNotNull(config.getModifiedMessageFormat());
this.encoding = Charset.forName(config.getEncoding());
this.logStream = logStream;
this.logName = logStream.getSingerLog().getSingerLogConfig().getName();
this.appendNewline = config.isAppendNewLine();
try {
this.regex = Pattern.compile(config.getRegex());
} catch (PatternSyntaxException e) {
throw new ConfigurationException("Invalid modifier regex ", e);
}
}

/**
* Transforms a message based on the regex pattern and message format.
* If the message does not match the regex pattern or an exception occurs, the original message
* is returned so that we don't block the log stream. If the message matches the regex pattern,
* the matched groups are replaced in the message format, each group should be referenced in the
* modified message format using $1, $2, etc.
*
* @param message the message to transform.
* @return the transformed message or the original message if no transformation occurred.
*/
@Override
public ByteBuffer transform(ByteBuffer message) {
String messageString = bufferToString(message);
Matcher matcher = regex.matcher(messageString);
if (!matcher.find()) {
LOG.debug("[RegexParser] Message " + messageString + " did not match regex: " + regex);
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "no_message_match",
"logName=" + "file=" + logStream.getFileNamePrefix());
return message;
}
try {
Map<String, String> groupMap = new HashMap<>();
for (int i = 1; i <= matcher.groupCount(); i++) {
groupMap.put("$" + i, matcher.group(i));
LOG.debug("Group " + i + ": " + matcher.group(i));
}
StringBuilder result = new StringBuilder(messageFormat);
groupMap.forEach((groupIndex, value) -> {
int start;
while ((start = result.indexOf(groupIndex)) != -1) {
result.replace(start, start + groupIndex.length(), value);
}
});
if (appendNewline) result.append("\n");
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "success",
"logName=" + logName, "file=" + logStream.getFileNamePrefix());
return ByteBuffer.wrap(result.toString().getBytes(encoding));
} catch (Exception e) {
LOG.warn("Failed to transform message in log stream {}, returning raw message.",
logName, e);
OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "failures",
"logName=" + logName, "file=" + logStream.getFileNamePrefix());
return message;
}
}

/**
* Converts a ByteBuffer to a String using the specified encoding.
*
* @param message
* @return the String representation of the ByteBuffer.
*/
private String bufferToString(ByteBuffer message) {
String string = new String(message.array(), 0, message.limit(), encoding);
return string;
}
}
Loading

0 comments on commit 650880f

Please sign in to comment.