Skip to content

Commit

Permalink
[INLONG-10938][SDK] Transform SQL supports fuzzy matching of LIKE and…
Browse files Browse the repository at this point in the history
… NOT LIKE
  • Loading branch information
ZKpLo committed Sep 6, 2024
1 parent edf93bd commit 585dae1
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,8 @@ public static String parseChar(String charStr) {
}
return charStr;
}

public static boolean isEmpty(Object str) {
return str == null || str.toString().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ private static void init() {
}

public static ValueParser getTransformFunction(Function func) {
if (func == null) {
return null;
}
String functionName = func.getName();
Class<?> clazz = functionMap.get(functionName);
if (clazz == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.parser;

import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.operators.relational.LikeExpression;

import java.util.regex.Pattern;

/**
* LikeParser
*/
@Slf4j
@TransformParser(values = LikeExpression.class)
public class LikeParser implements ValueParser {

private final ValueParser destParser;
private final ValueParser patternParser;
private final ValueParser escapeParser;
private final boolean isNot;
private static final String REGEX_SPECIAL_CHAR = "[]()|^-+*?{}$\\.";

public LikeParser(LikeExpression expr) {
destParser = OperatorTools.buildParser(expr.getLeftExpression());
patternParser = OperatorTools.buildParser(expr.getRightExpression());
escapeParser = OperatorTools.buildParser(expr.getEscape());
isNot = expr.isNot();
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object destObj = destParser.parse(sourceData, rowIndex, context);
Object patternObj = patternParser.parse(sourceData, rowIndex, context);
if (destObj == null || patternObj == null) {
return null;
}
char escapeChr = '\\';
if (escapeParser != null) {
Object escapeObj = this.escapeParser.parse(sourceData, rowIndex, context);
if (!StringUtil.isEmpty(escapeObj)) {
escapeChr = escapeObj.toString().charAt(0);
}
}
String destStr = destObj.toString();
String pattern = patternObj.toString();
try {
final String regex = buildLikeRegex(pattern, escapeChr);
boolean isMatch = Pattern.matches(regex.toLowerCase(), destStr.toLowerCase());
if (isNot) {
return !isMatch;
}
return isMatch;
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}

private String buildLikeRegex(String pattern, char escapeChar) {
int len = pattern.length();
StringBuilder regexPattern = new StringBuilder(len + len);
for (int i = 0; i < len; i++) {
char c = pattern.charAt(i);
if (REGEX_SPECIAL_CHAR.indexOf(c) >= 0) {
regexPattern.append('\\');
}
if (c == escapeChar) {
if (i == (pattern.length() - 1)) {
// At the end of a string, the escape character represents itself
regexPattern.append(c);
continue;
}
char nextChar = pattern.charAt(i + 1);
if (nextChar == '_' || nextChar == '%' || nextChar == escapeChar) {
regexPattern.append(nextChar);
i++;
} else {
throw new RuntimeException("Illegal pattern string");
}
} else if (c == '_') {
regexPattern.append('.');
} else if (c == '%') {
regexPattern.append("(?s:.*)");
} else {
regexPattern.append(c);
}
}
return regexPattern.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ private static void init() {
}

public static ValueParser getTransformParser(Expression expr) {
if (expr == null) {
return null;
}
Class<?> clazz = parserMap.get(expr.getClass());
if (clazz == null) {
return new ColumnParser((Column) expr);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process;

import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* TestLikeParserProcessor
* description: test LikeParser in transform processor
*/
public class TestLikeParserProcessor {

private static final List<FieldInfo> srcFields = new ArrayList<>();
private static final List<FieldInfo> dstFields = new ArrayList<>();
private static final CsvSourceInfo csvSource;
private static final KvSinkInfo kvSink;

static {
for (int i = 1; i < 3; i++) {
FieldInfo field = new FieldInfo();
field.setName("string" + i);
srcFields.add(field);
}
FieldInfo field = new FieldInfo();
field.setName("result");
dstFields.add(field);
csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
kvSink = new KvSinkInfo("UTF-8", dstFields);
}

@Test
public void testLikeFunction() throws Exception {
String transformSql = null, data = null;
TransformConfig config = null;
TransformProcessor<String, String> processor = null;
List<String> output = null;

transformSql = "select string1 like string2 from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case1: apple like %App%
output = processor.transform("apple|%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case2: apple like /%App%
// The reason why '\' is not used as an escape string here is that when processing CSV data,
// the quote parameter defaults to the '\' character
transformSql = "select string1 like string2 ESCAPE '/' from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

output = processor.transform("apple|/%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case3: %apple like /%App% ESCAPE '/'
output = processor.transform("%apple|/%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case4: %apple like /%Apple_ ESCAPE '/'
output = processor.transform("%apple|/%Apple_", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case5: %apple like /%Appl_ ESCAPE '/'
output = processor.transform("%apple|/%Appl_", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case6: %ap_ple like /%Ap%_e ESCAPE '/'
output = processor.transform("%ap_ple|/%Ap%_e", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case7: %ap_ple/ like /%Ap%_e/ ESCAPE '/'
output = processor.transform("%ap_ple/|/%Ap%_e/", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));
}
@Test
public void testNotLikeFunction() throws Exception {
String transformSql = null, data = null;
TransformConfig config = null;
TransformProcessor<String, String> processor = null;
List<String> output = null;

transformSql = "select string1 not like string2 from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case1: apple not like %App%
output = processor.transform("apple|%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case2: apple not like /%App%
// The reason why '\' is not used as an escape string here is that when processing CSV data,
// the quote parameter defaults to the '\' character
transformSql = "select string1 not like string2 ESCAPE '/' from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

output = processor.transform("apple|/%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case3: %apple not like /%App% ESCAPE '/'
output = processor.transform("%apple|/%App%", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case4: %apple not like /%Apple_ ESCAPE '/'
output = processor.transform("%apple|/%Apple_", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=true", output.get(0));

// case5: %apple not like /%Appl_ ESCAPE '/'
output = processor.transform("%apple|/%Appl_", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case6: %ap_ple not like /%Ap%_e ESCAPE '/'
output = processor.transform("%ap_ple|/%Ap%_e", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));

// case7: %ap_ple/ not like /%Ap%_e/ ESCAPE '/'
output = processor.transform("%ap_ple/|/%Ap%_e/", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=false", output.get(0));
}

}

0 comments on commit 585dae1

Please sign in to comment.