From bb1fdf338a412c7c83ac3bab83b4d292ea655c90 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Mon, 18 Nov 2024 17:21:12 -0800 Subject: [PATCH] add message transformer interface --- singer-commons/src/main/thrift/config.thrift | 30 +++++ .../singer/common/SingerConfigDef.java | 5 + .../singer/common/SingerMetrics.java | 3 + .../singer/monitor/DefaultLogMonitor.java | 10 +- .../singer/reader/TextLogFileReader.java | 16 ++- .../reader/TextLogFileReaderFactory.java | 8 +- .../singer/transforms/MessageTransformer.java | 16 +++ .../transforms/MessageTransformerFactory.java | 39 +++++++ .../singer/transforms/RegexBasedModifier.java | 105 ++++++++++++++++++ .../singer/utils/LogConfigUtils.java | 44 ++++++++ ...TestMemoryEfficientLogStreamProcessor.java | 2 +- .../singer/reader/TestTextLogFileReader.java | 56 ++++++++-- .../transforms/RegexBasedModifierTest.java | 67 +++++++++++ .../singer/utils/TestLogConfigUtils.java | 19 ++++ 14 files changed, 406 insertions(+), 14 deletions(-) create mode 100644 singer/src/main/java/com/pinterest/singer/transforms/MessageTransformer.java create mode 100644 singer/src/main/java/com/pinterest/singer/transforms/MessageTransformerFactory.java create mode 100644 singer/src/main/java/com/pinterest/singer/transforms/RegexBasedModifier.java create mode 100644 singer/src/test/java/com/pinterest/singer/transforms/RegexBasedModifierTest.java diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 719845e0..f24ed750 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -83,6 +83,32 @@ struct LogStreamReaderConfig { 3: optional TextReaderConfig textReaderConfig; } +/** + * Transforms related configurations + * + * REGEX_BASED_MODIFIER: A regex based modifier that modifies the log message based on the regex + * and the modified message format. + **/ +enum TransformType { + REGEX_BASED_MODIFIER = 0 +} + +struct RegexBasedModifierConfig { + // The regex to match the log message. + 1: required string regex; + // The modified message format. The regex captured groups can be referenced by $1, $2, etc. + 2: required string modifiedMessageFormat; + // The encoding of the log message. + 3: optional string encoding = "UTF-8"; + // Append a newline to the end of the modified message. + 4: optional bool appendNewLine = true; +} + +struct MessageTransformerConfig { + 1: required TransformType type; + 2: optional RegexBasedModifierConfig regexBasedModifierConfig; +} + /** * KAFKA: kafka writer that distributes messages to topic partitions based on * partititioner. The default partitioner does random distribution among @@ -250,6 +276,10 @@ struct SingerLogConfig { * this log will be skipped directly when draining is enabled */ 14: optional bool skipDraining = false; + /** + * Configuration to transform log message + */ + 15: optional MessageTransformerConfig messageTransformerConfig; } /** diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index b426aa59..ae879f56 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -104,6 +104,11 @@ public class SingerConfigDef { public static final String TEXT_READER_FILTER_MESSAGE_REGEX = "filterMessageRegex"; + public static final String RBM_REGEX = "regex"; + public static final String RBM_MODIFIED_MESSAGE_FORMAT = "modifiedMessageFormat"; + public static final String RBM_ENCODING = "encoding"; + public static final String RBM_APPEND_NEW_LINE = "appendNewLine"; + public static final String BUCKET = "bucket"; public static final String KEY_FORMAT = "keyFormat"; public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB"; diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index f271472d..c5e31141 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -84,6 +84,9 @@ public class SingerMetrics { public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions"; public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers"; + public static final String SINGER_TRANSFORMER = SINGER_PREIX + "transformer."; + public static final String REGEX_BASED_MODIFIER = SINGER_TRANSFORMER + "regex_based_modifier."; + public static final String KUBE_PREFIX = SINGER_PREIX + "kube."; public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error"; public static final String KUBE_SERVICE_ERROR = KUBE_PREFIX + "kubeservice_thread_error"; diff --git a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java index f08c02a5..669178c6 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java @@ -31,6 +31,7 @@ import com.pinterest.singer.reader.DefaultLogStreamReader; import com.pinterest.singer.reader.TextLogFileReaderFactory; import com.pinterest.singer.reader.ThriftLogFileReaderFactory; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; import com.pinterest.singer.thrift.configuration.NoOpWriteConfig; import com.pinterest.singer.thrift.configuration.S3WriterConfig; import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; @@ -280,7 +281,8 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon LogStream logStream) throws ConfigurationException, LogStreamReaderException, LogStreamWriterException { LogStreamReader reader = - createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig()); + createLogStreamReader(logStream, singerLogConfig.getLogStreamReaderConfig(), + singerLogConfig.getMessageTransformerConfig()); LogStreamWriter writer = createLogStreamWriter(logStream, singerLogConfig.getLogStreamWriterConfig()); @@ -326,9 +328,11 @@ private LogStreamProcessor createLogStreamProcessor(SingerLogConfig singerLogCon * @throws Exception when fail to create the reader for the LogStream. */ protected LogStreamReader createLogStreamReader(LogStream logStream, - LogStreamReaderConfig readerConfig) + LogStreamReaderConfig readerConfig, + MessageTransformerConfig messageTransformerConfig) throws LogStreamReaderException { switch (readerConfig.getType()) { + // No transforms available for thrift logs yet case THRIFT: ThriftReaderConfig thriftReaderConfig = readerConfig.getThriftReaderConfig(); return new DefaultLogStreamReader(logStream, @@ -337,7 +341,7 @@ protected LogStreamReader createLogStreamReader(LogStream logStream, case TEXT: TextReaderConfig textReaderConfig = readerConfig.getTextReaderConfig(); return new DefaultLogStreamReader(logStream, - new TextLogFileReaderFactory(textReaderConfig)); + new TextLogFileReaderFactory(textReaderConfig, messageTransformerConfig)); default: throw new LogStreamReaderException("Unsupported log reader type"); diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java index 1c663c81..15949128 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java +++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java @@ -22,7 +22,10 @@ import com.pinterest.singer.thrift.LogMessageAndPosition; import com.pinterest.singer.thrift.LogPosition; import com.pinterest.singer.thrift.TextMessage; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; import com.pinterest.singer.thrift.configuration.TextLogMessageType; +import com.pinterest.singer.transforms.MessageTransformer; +import com.pinterest.singer.transforms.MessageTransformerFactory; import com.pinterest.singer.utils.SingerUtils; import com.google.common.base.Preconditions; @@ -57,6 +60,7 @@ public class TextLogFileReader implements LogFileReader { private final TSerializer serializer; private final TextMessageReader textMessageReader; private final Pattern filterMessageRegex; + private final MessageTransformer messageTransformer; private ByteBuffer maxBuffer; // The text log message format, can be TextMessage, or String; @@ -85,7 +89,8 @@ public TextLogFileReader( String hostname, String availabilityZone, String prependFieldDelimiter, - Map headers) throws Exception { + Map headers, + MessageTransformerConfig messageTransformerConfig) throws Exception { Preconditions.checkArgument(!Strings.isNullOrEmpty(path)); Preconditions.checkArgument(byteOffset >= 0); @@ -114,6 +119,9 @@ public TextLogFileReader( this.maxBuffer = ByteBuffer.allocate(capacity); this.trimTailingNewlineCharacter = trimTailingNewlineCharacter; this.filterMessageRegex = filterMessageRegex; + this.messageTransformer = + messageTransformerConfig != null ? MessageTransformerFactory.getTransformer( + messageTransformerConfig, logStream) : null; // Make sure the path is still associated with the LogFile. // This can happen when the path is reused for another LogFile during log @@ -152,6 +160,12 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc TextMessageReader.bufToString(message)).matches()) { skipLogMessage = true; } + + // Transform message if messageTransformer is set. + if (messageTransformer != null) { + message = (ByteBuffer) messageTransformer.transform(message); + } + String prependStr = ""; if (prependTimestamp) { prependStr += System.currentTimeMillis() + prependFieldDelimiter; diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java index 2e93bc8f..525d6734 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java +++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java @@ -18,6 +18,7 @@ import com.pinterest.singer.common.LogStream; import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.thrift.LogFile; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; import com.pinterest.singer.thrift.configuration.TextReaderConfig; import com.pinterest.singer.utils.LogFileUtils; import com.pinterest.singer.utils.SingerUtils; @@ -37,9 +38,11 @@ public class TextLogFileReaderFactory implements LogFileReaderFactory { private static final Logger LOG = LoggerFactory.getLogger(TextLogFileReaderFactory.class); private final TextReaderConfig readerConfig; + private final MessageTransformerConfig messageTransformerConfig; - public TextLogFileReaderFactory(TextReaderConfig readerConfig) { + public TextLogFileReaderFactory(TextReaderConfig readerConfig, MessageTransformerConfig messageTransformerConfig) { this.readerConfig = Preconditions.checkNotNull(readerConfig); + this.messageTransformerConfig = messageTransformerConfig; } @SuppressWarnings("resource") @@ -70,7 +73,8 @@ public LogFileReader getLogFileReader( readerConfig.getPrependFieldDelimiter(), readerConfig.getEnvironmentVariables() != null ? new HashMap<>(readerConfig.getEnvironmentVariables()) - : null); + : null, + messageTransformerConfig); } catch (LogFileReaderException e) { LOG.warn("Exception in getLogFileReader", e); long inode = logFile.getInode(); diff --git a/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformer.java b/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformer.java new file mode 100644 index 00000000..80718dfd --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformer.java @@ -0,0 +1,16 @@ +package com.pinterest.singer.transforms; + +/** + * Represents a message transformer that can be used by LogFileReaders or other components to + * transform messages. + */ +public interface MessageTransformer { + + /** + * Transform the message. + * + * @param message the message to transform. + * @return the transformed message. + */ + public T transform(T message); +} diff --git a/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformerFactory.java b/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformerFactory.java new file mode 100644 index 00000000..26e49927 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/transforms/MessageTransformerFactory.java @@ -0,0 +1,39 @@ +package com.pinterest.singer.transforms; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; +import com.pinterest.singer.thrift.configuration.TransformType; + +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageTransformerFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MessageTransformerFactory.class); + + public static MessageTransformer getTransformer(MessageTransformerConfig transformerConfig, LogStream logStream) + throws IllegalArgumentException { + if (transformerConfig == null) { + return null; + } + TransformType transformType = transformerConfig.getType(); + MessageTransformer transformer; + try { + switch (transformType) { + case REGEX_BASED_MODIFIER: + RegexBasedModifierConfig regexBasedModifierConfig = transformerConfig.getRegexBasedModifierConfig(); + transformer = new RegexBasedModifier(regexBasedModifierConfig, logStream); + break; + default: + transformer = null; + } + } catch (ConfigurationException e) { + LOG.warn("Could not initialize transformer {} for log stream {}, it will be disabled", transformType, + logStream.getSingerLog().getSingerLogConfig().getName(), e); + transformer = null; + } + return transformer; + } +} diff --git a/singer/src/main/java/com/pinterest/singer/transforms/RegexBasedModifier.java b/singer/src/main/java/com/pinterest/singer/transforms/RegexBasedModifier.java new file mode 100644 index 00000000..ac440de8 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/transforms/RegexBasedModifier.java @@ -0,0 +1,105 @@ +package com.pinterest.singer.transforms; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; + +import com.google.common.base.Preconditions; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * A message transformer that modifies messages based on a regex pattern. + * This transformer uses a regex pattern to match a message and then replaces the matched groups in + * the message with the specified format. + */ +public class RegexBasedModifier implements MessageTransformer { + private static final Logger LOG = LoggerFactory.getLogger(RegexBasedModifier.class); + private final LogStream logStream; + private final String logName; + private final Pattern regex; + private final Charset encoding; + private final String messageFormat; + private final boolean appendNewline; + + public RegexBasedModifier(RegexBasedModifierConfig config, LogStream logStream) + throws ConfigurationException { + this.messageFormat = Preconditions.checkNotNull(config.getModifiedMessageFormat()); + this.encoding = Charset.forName(config.getEncoding()); + this.logStream = logStream; + this.logName = logStream.getSingerLog().getSingerLogConfig().getName(); + this.appendNewline = config.isAppendNewLine(); + try { + this.regex = Pattern.compile(config.getRegex()); + } catch (PatternSyntaxException e) { + throw new ConfigurationException("Invalid modifier regex ", e); + } + } + + /** + * Transforms a message based on the regex pattern and message format. + * If the message does not match the regex pattern or an exception occurs, the original message + * is returned so that we don't block the log stream. If the message matches the regex pattern, + * the matched groups are replaced in the message format, each group should be referenced in the + * modified message format using $1, $2, etc. + * + * @param message the message to transform. + * @return the transformed message or the original message if no transformation occurred. + */ + @Override + public ByteBuffer transform(ByteBuffer message) { + String messageString = bufferToString(message); + Matcher matcher = regex.matcher(messageString); + if (!matcher.find()) { + LOG.debug("[RegexParser] Message " + messageString + " did not match regex: " + regex); + OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "no_message_match", + "logName=" + "file=" + logStream.getFileNamePrefix()); + return message; + } + try { + Map groupMap = new HashMap<>(); + for (int i = 1; i <= matcher.groupCount(); i++) { + groupMap.put("$" + i, matcher.group(i)); + LOG.debug("Group " + i + ": " + matcher.group(i)); + } + StringBuilder result = new StringBuilder(messageFormat); + groupMap.forEach((groupIndex, value) -> { + int start; + while ((start = result.indexOf(groupIndex)) != -1) { + result.replace(start, start + groupIndex.length(), value); + } + }); + if (appendNewline) result.append("\n"); + OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "success", + "logName=" + logName, "file=" + logStream.getFileNamePrefix()); + return ByteBuffer.wrap(result.toString().getBytes(encoding)); + } catch (Exception e) { + LOG.warn("Failed to transform message in log stream {}, returning raw message.", + logName, e); + OpenTsdbMetricConverter.incr(SingerMetrics.REGEX_BASED_MODIFIER + "failures", + "logName=" + logName, "file=" + logStream.getFileNamePrefix()); + return message; + } + } + + /** + * Converts a ByteBuffer to a String using the specified encoding. + * + * @param message + * @return the String representation of the ByteBuffer. + */ + private String bufferToString(ByteBuffer message) { + String string = new String(message.array(), 0, message.limit(), encoding); + return string; + } +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 280dab44..00d17285 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -26,6 +26,7 @@ import com.pinterest.singer.loggingaudit.thrift.configuration.AuditConfig; import com.pinterest.singer.metrics.StatsPusher; import com.pinterest.singer.thrift.configuration.AdminConfig; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; import com.pinterest.singer.thrift.configuration.NoOpWriteConfig; import com.pinterest.singer.thrift.configuration.FileNameMatchMode; import com.pinterest.singer.thrift.configuration.HeartbeatWriterConfig; @@ -43,6 +44,7 @@ import com.pinterest.singer.thrift.configuration.ReaderType; import com.pinterest.singer.thrift.configuration.RealpinObjectType; import com.pinterest.singer.thrift.configuration.RealpinWriterConfig; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; import com.pinterest.singer.thrift.configuration.S3WriterConfig; import com.pinterest.singer.thrift.configuration.SamplingType; import com.pinterest.singer.thrift.configuration.SingerConfig; @@ -51,6 +53,7 @@ import com.pinterest.singer.thrift.configuration.TextLogMessageType; import com.pinterest.singer.thrift.configuration.TextReaderConfig; import com.pinterest.singer.thrift.configuration.ThriftReaderConfig; +import com.pinterest.singer.thrift.configuration.TransformType; import com.pinterest.singer.thrift.configuration.WriterType; import com.google.common.base.Joiner; @@ -554,6 +557,12 @@ public static SingerLogConfig parseLogConfig(String logName, } } + // initialize and set transformer config + if (logConfiguration.containsKey("transformer.type")) { + config.setMessageTransformerConfig( + parseMessageTransformerConfig(new SubsetConfiguration(logConfiguration, "transformer."))); + } + FileNameMatchMode matchMode = FileNameMatchMode.PREFIX; String matchModeStr = logConfiguration.getString("logFileMatchMode"); if (matchModeStr != null) { @@ -1275,6 +1284,41 @@ protected static TextReaderConfig parseTextReaderConfig(AbstractConfiguration te return config; } + protected static MessageTransformerConfig parseMessageTransformerConfig(AbstractConfiguration transformerConfiguration) + throws ConfigurationException { + String messageTransformerType = transformerConfiguration.getString("type"); + TransformType type = TransformType.valueOf(messageTransformerType.toUpperCase()); + MessageTransformerConfig + messageTransformerConfig = + new MessageTransformerConfig(type); + + if (type.equals(TransformType.REGEX_BASED_MODIFIER)) { + RegexBasedModifierConfig regexBasedModifierConfig = parseRegexBasedModifierConfig( + new SubsetConfiguration(transformerConfiguration, messageTransformerType + ".")); + messageTransformerConfig.setRegexBasedModifierConfig(regexBasedModifierConfig); + } + return messageTransformerConfig; + } + + protected static RegexBasedModifierConfig parseRegexBasedModifierConfig(AbstractConfiguration regexBasedModifierConfig) { + regexBasedModifierConfig.setThrowExceptionOnMissing(true); + String regex = regexBasedModifierConfig.getString(SingerConfigDef.RBM_REGEX); + String modifiedMessageFormat = regexBasedModifierConfig.getString(SingerConfigDef.RBM_MODIFIED_MESSAGE_FORMAT); + regexBasedModifierConfig.setThrowExceptionOnMissing(false); + + RegexBasedModifierConfig config = new RegexBasedModifierConfig(); + config.setRegex(regex); + config.setModifiedMessageFormat(modifiedMessageFormat); + + if (regexBasedModifierConfig.containsKey(SingerConfigDef.RBM_ENCODING)) { + config.setEncoding(regexBasedModifierConfig.getString(SingerConfigDef.RBM_ENCODING)); + } + if (regexBasedModifierConfig.containsKey(SingerConfigDef.RBM_APPEND_NEW_LINE)) { + config.setAppendNewLine(regexBasedModifierConfig.getBoolean(SingerConfigDef.RBM_APPEND_NEW_LINE)); + } + return config; + } + protected static LogStreamProcessorConfig parseLogStreamProcessorConfig(AbstractConfiguration processorConfiguration) { processorConfiguration.setThrowExceptionOnMissing(true); long minIntervalInMillis; diff --git a/singer/src/test/java/com/pinterest/singer/processor/TestMemoryEfficientLogStreamProcessor.java b/singer/src/test/java/com/pinterest/singer/processor/TestMemoryEfficientLogStreamProcessor.java index 62e07363..e3a1215f 100644 --- a/singer/src/test/java/com/pinterest/singer/processor/TestMemoryEfficientLogStreamProcessor.java +++ b/singer/src/test/java/com/pinterest/singer/processor/TestMemoryEfficientLogStreamProcessor.java @@ -492,7 +492,7 @@ public void testProcessLogStreamWithMessagesSkipped() throws Exception { textReaderConfig.setTextLogMessageType(TextLogMessageType.PLAIN_TEXT); LogStreamReader logStreamReader = new DefaultLogStreamReader( logStream, - new TextLogFileReaderFactory(textReaderConfig)); + new TextLogFileReaderFactory(textReaderConfig, null)); MemoryEfficientLogStreamProcessor processor = new MemoryEfficientLogStreamProcessor(logStream, null, logStreamReader, writer, processorBatchSize, processingIntervalInMillisMin, processingIntervalInMillisMax, processingTimeSliceInMilliseconds, logRetentionInSecs, false); diff --git a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java index a8a734a6..ea03fe89 100644 --- a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java +++ b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java @@ -18,9 +18,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; @@ -34,8 +36,11 @@ import com.pinterest.singer.common.SingerLog; import com.pinterest.singer.thrift.LogFile; import com.pinterest.singer.thrift.LogMessageAndPosition; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; import com.pinterest.singer.thrift.configuration.TextLogMessageType; +import com.pinterest.singer.thrift.configuration.TransformType; import com.pinterest.singer.utils.SingerUtils; import com.pinterest.singer.utils.TextLogger; @@ -51,7 +56,7 @@ public void testReadLogMessageAndPosition() throws Exception { LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test"); LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1, Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, null, null, - null, null); + null, null, null); for (int i = 0; i < 100; i++) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(dataWritten.get(i).trim(), new String(log.getLogMessage().getMessage())); @@ -71,7 +76,7 @@ public void testReadLogMessageAndPositionWithHostname() throws Exception { LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test"); LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1, Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, true, false, hostname, "n/a", - delimiter, null); + delimiter, null, null); for (int i = 0; i < 100; i++) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); String expected = hostname + delimiter + dataWritten.get(i); @@ -92,7 +97,7 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception { LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test"); LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2, Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, null, "n/a", - null, null); + null, null, null); for (int i = 0; i < 100; i = i + 2) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(dataWritten.get(i) + dataWritten.get(i + 1).trim(), @@ -102,6 +107,43 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception { reader.close(); } + @Test + public void testReadWithTransformEnabled() throws Exception { + String path = FilenameUtils.concat(getTempPath(), "test2.log"); + MessageTransformerConfig transformConfig = new MessageTransformerConfig(); + RegexBasedModifierConfig regexBasedModifierConfig = new RegexBasedModifierConfig(); + regexBasedModifierConfig.setRegex("(?s)^(.+?) (stdout|stderr) (F|P) (.*)$"); + regexBasedModifierConfig.setModifiedMessageFormat("Log: $4, Timestamp: $1"); + regexBasedModifierConfig.setAppendNewLine(false); + transformConfig.setType(TransformType.REGEX_BASED_MODIFIER); + transformConfig.setRegexBasedModifierConfig(regexBasedModifierConfig); + + Map> messages = new HashMap<>(); + TextLogger logger = new TextLogger(path); + for (int i= 0; i < 100; i++) { + String timestamp = new Timestamp(System.currentTimeMillis()).toString(); + String message = UUID.randomUUID() + "\n"; + messages.put(i, new ArrayList<>()); + messages.get(i).add(message); + messages.get(i).add(timestamp); + logger.logText(timestamp + " stdout F " + message); + } + + long inode = SingerUtils.getFileInode(SingerUtils.getPath(path)); + LogFile logFile = new LogFile(inode); + LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test"); + LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1, + Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, false, null, "n/a", null, + null, transformConfig); + + for (int i = 0; i < 100; i++) { + LogMessageAndPosition log = reader.readLogMessageAndPosition(); + assertEquals("Log: " + messages.get(i).get(0) + ", Timestamp: " + messages.get(i).get(1), + new String(log.getLogMessage().getMessage())); + } + reader.close(); + } + @Test public void testReadMessagesWithFilterRegexEnabled() throws Exception { String path = FilenameUtils.concat(getTempPath(), "test_filtered.log"); @@ -121,7 +163,7 @@ public void testReadMessagesWithFilterRegexEnabled() throws Exception { LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1, Pattern.compile("^.*$"), Pattern.compile(filterRegex, Pattern.DOTALL), TextLogMessageType.PLAIN_TEXT, false, false, false, null, null, - null, null); + null, null, null); for (int i = 0; i < 100; i++) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); if (i % 2 == 0) { @@ -139,7 +181,7 @@ public void testReadMessagesWithFilterRegexEnabled() throws Exception { reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1, Pattern.compile("^.*$"), Pattern.compile(filterRegex, Pattern.DOTALL), TextLogMessageType.PLAIN_TEXT, false, false, false, "test", "test-az", - null, new HashMap<>()); + null, new HashMap<>(), null); // No messages should have skipMessageHeader for (int i = 0; i < 100; i++) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); @@ -158,7 +200,7 @@ public void testEnvironmentVariableInjection() throws Exception { LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test"); LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2, Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null, - new HashMap<>(ImmutableMap.of("test", ByteBuffer.wrap("value".getBytes())))); + new HashMap<>(ImmutableMap.of("test", ByteBuffer.wrap("value".getBytes()))), null); for (int i = 0; i < 100; i = i + 2) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(4, log.getInjectedHeadersSize()); @@ -174,7 +216,7 @@ public void testEnvironmentVariableInjection() throws Exception { reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2, Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null, - null); + null, null); for (int i = 0; i < 100; i = i + 2) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(0, log.getInjectedHeadersSize()); diff --git a/singer/src/test/java/com/pinterest/singer/transforms/RegexBasedModifierTest.java b/singer/src/test/java/com/pinterest/singer/transforms/RegexBasedModifierTest.java new file mode 100644 index 00000000..93e4390d --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/transforms/RegexBasedModifierTest.java @@ -0,0 +1,67 @@ +package com.pinterest.singer.transforms; + +import com.pinterest.singer.SingerTestBase; +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerLog; +import com.pinterest.singer.thrift.configuration.MessageTransformerConfig; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import com.pinterest.singer.thrift.configuration.TransformType; + +import com.google.common.base.Charsets; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class RegexBasedModifierTest extends SingerTestBase { + + MessageTransformerConfig messageTransformerConfig; + RegexBasedModifierConfig regexBasedModifierConfig; + LogStream logStream; + + @Override + public void setUp() { + messageTransformerConfig = new MessageTransformerConfig(); + messageTransformerConfig.setType(TransformType.REGEX_BASED_MODIFIER); + regexBasedModifierConfig = new RegexBasedModifierConfig(); + SingerLog singerLog = new SingerLog( + new SingerLogConfig("test", "test_dir", "test.log", null, null, null)); + logStream = new LogStream(singerLog, "test.log"); + } + + @Override + public void tearDown() { + messageTransformerConfig = null; + regexBasedModifierConfig = null; + } + + @Test + public void testSimpleLogExtraction() throws Exception { + regexBasedModifierConfig.setRegex("\\[(?.*?)\\] (?.*)"); + regexBasedModifierConfig.setModifiedMessageFormat("{Thread: \"$1\", Log: \"$2\"}"); + regexBasedModifierConfig.setEncoding("UTF-8"); + regexBasedModifierConfig.setAppendNewLine(false); + messageTransformerConfig.setRegexBasedModifierConfig(regexBasedModifierConfig); + RegexBasedModifier regexBasedModifier = new RegexBasedModifier(regexBasedModifierConfig, logStream); + + String logMessage = "[SingerMain] Starting Singer..."; + ByteBuffer logMessageBuf = regexBasedModifier.transform(ByteBuffer.wrap(logMessage.getBytes())); + + String expected = "{Thread: \"SingerMain\", Log: \"Starting Singer...\"}"; + assertEquals(expected, new String(logMessageBuf.array(), 0, logMessageBuf.limit(), Charsets.UTF_8)); + } + + @Test + public void testNoRegexMatch() throws Exception { + regexBasedModifierConfig.setRegex("singer_(\\d+).log"); + regexBasedModifierConfig.setModifiedMessageFormat("singer log number: $1"); + regexBasedModifierConfig.setEncoding("UTF-8"); + messageTransformerConfig.setRegexBasedModifierConfig(regexBasedModifierConfig); + RegexBasedModifier regexBasedModifier = new RegexBasedModifier(regexBasedModifierConfig, logStream); + + String logMessage = "This is a sample message"; + ByteBuffer logMessageBuf = ByteBuffer.wrap(logMessage.getBytes()); + assertEquals(logMessageBuf, regexBasedModifier.transform(logMessageBuf)); + } + +} diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java index 6a23befc..6c915957 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java @@ -51,6 +51,7 @@ import com.pinterest.singer.thrift.configuration.LogStreamProcessorConfig; import com.pinterest.singer.thrift.configuration.MemqWriterConfig; import com.pinterest.singer.thrift.configuration.RealpinWriterConfig; +import com.pinterest.singer.thrift.configuration.RegexBasedModifierConfig; import com.pinterest.singer.thrift.configuration.S3WriterConfig; import com.pinterest.singer.thrift.configuration.SamplingType; import com.pinterest.singer.thrift.configuration.TextReaderConfig; @@ -467,4 +468,22 @@ public void testS3WriterConfigurations() throws Exception { assertEquals(30, s3WriterConfig.getMinUploadTimeInSeconds()); assertEquals(10, s3WriterConfig.getMaxRetries()); } + + @Test + public void testRegexTransformerConfigurations() throws Exception { + String + config = + "type=regex_based_modifier\n" + + "modifiedMessageFormat={log_level: $2\\, message: $4\\, timestamp: $1}\n" + + "regex_based_modifier.encoding=UTF-8\n" + + "regex=some_regex\n"; + PropertiesConfiguration conf = new PropertiesConfiguration(); + conf.load(new ByteArrayInputStream(config.getBytes())); + + RegexBasedModifierConfig + regexBasedModifierConfig = + LogConfigUtils.parseRegexBasedModifierConfig(conf); + + assertNotNull(regexBasedModifierConfig); + } }