diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java index fbc93f4e913..1b3eadc7f22 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java @@ -98,11 +98,11 @@ public static List createFilterFunctions(FilterDefinition filter .map(filterRule -> createFilterFunction(filterRule, transformName)).collect(Collectors.toList()); // Move logicOperator to preFunction for (int index = filterFunctions.size() - 1; index > 0; index--) { - SingleValueFilterFunction function = (SingleValueFilterFunction) filterFunctions.get(index); - SingleValueFilterFunction preFunction = (SingleValueFilterFunction) filterFunctions.get(index - 1); + FilterFunction function = filterFunctions.get(index); + FilterFunction preFunction = filterFunctions.get(index - 1); function.setLogicOperator(preFunction.getLogicOperator()); } - ((SingleValueFilterFunction) filterFunctions.get(0)).setLogicOperator(EmptyOperator.getInstance()); + (filterFunctions.get(0)).setLogicOperator(EmptyOperator.getInstance()); return filterFunctions; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java index bc07a89d80d..77efea72b95 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java @@ -21,6 +21,8 @@ import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -33,6 +35,9 @@ @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter"), @JsonSubTypes.Type(value = BetweenFunction.class, name = "betweenFunction") }) -public interface FilterFunction extends Function { +@Data +public abstract class FilterFunction implements Function { + @JsonProperty("logicOperator") + protected LogicOperator logicOperator; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java index 3a4d6390748..28d0a0c371e 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; +import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -38,7 +39,7 @@ */ @JsonTypeName("betweenFunction") @Data -public class BetweenFunction implements FilterFunction { +public class BetweenFunction extends FilterFunction implements Serializable { @Nonnull @JsonProperty("field") @@ -49,9 +50,6 @@ public class BetweenFunction implements FilterFunction { @Nonnull @JsonProperty("end") private final FunctionParam end; - @Nonnull - @JsonProperty("logicOperator") - private final LogicOperator logicOperator; @JsonCreator public BetweenFunction( diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java index ef1945c7918..6db7cefe817 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java @@ -30,6 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -40,7 +41,7 @@ @JsonTypeName("multiValueFilter") @Data @NoArgsConstructor -public class MultiValueFilterFunction implements FilterFunction { +public class MultiValueFilterFunction extends FilterFunction implements Serializable { @JsonProperty("source") private FunctionParam source; @@ -48,8 +49,6 @@ public class MultiValueFilterFunction implements FilterFunction { private List targets; @JsonProperty("compareOperator") private MultiValueCompareOperator compareOperator; - @JsonProperty("logicOperator") - private LogicOperator logicOperator; @JsonCreator public MultiValueFilterFunction( diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java index cfb743a1e0a..eac6a2c3cf2 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java @@ -39,7 +39,7 @@ @JsonTypeName("singleValueFilter") @Data @NoArgsConstructor -public class SingleValueFilterFunction implements FilterFunction, Serializable { +public class SingleValueFilterFunction extends FilterFunction implements Serializable { private static final long serialVersionUID = 8953419088907830331L; @@ -49,8 +49,6 @@ public class SingleValueFilterFunction implements FilterFunction, Serializable { private FunctionParam target; @JsonProperty("compareOperator") private SingleValueCompareOperator compareOperator; - @JsonProperty("logicOperator") - private LogicOperator logicOperator; @JsonCreator public SingleValueFilterFunction(