diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java index 11fbfcda38f..28c8d83d40a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java @@ -21,15 +21,13 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo; import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; -import org.apache.commons.lang3.tuple.Pair; -import java.time.temporal.ChronoField; import java.util.List; -import java.util.Map; /** * DateAddFunction @@ -50,8 +48,8 @@ @TransformFunction(names = {"date_add"}) public class DateAddFunction implements ValueParser { - private ValueParser datetimeParser; - private ValueParser intervalParser; + private final ValueParser datetimeParser; + private final ValueParser intervalParser; public DateAddFunction(Function expr) { List expressions = expr.getParameters().getExpressions(); @@ -61,12 +59,12 @@ public DateAddFunction(Function expr) { @Override public Object parse(SourceData sourceData, int rowIndex, Context context) { - Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context); Object dateObj = datetimeParser.parse(sourceData, rowIndex, context); - if (intervalPairObj == null || dateObj == null) { + if (intervalInfoObj == null || dateObj == null) { return null; } - return DateUtil.dateAdd(OperatorTools.parseString(dateObj), - (Pair>) intervalPairObj, 1); + return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj), + (IntervalInfo) intervalInfoObj, true); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java index 77b2c93dfc6..7ce47d4d70d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java @@ -21,15 +21,15 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Objects; /** * DateDiffFunction @@ -43,9 +43,6 @@ public class DateDiffFunction implements ValueParser { private final ValueParser leftDateParser; private final ValueParser rightDateParser; - private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public DateDiffFunction(Function expr) { List expressions = expr.getParameters().getExpressions(); @@ -66,24 +63,11 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { return null; } try { - LocalDate left = getLocalDate(leftDate); - LocalDate right = getLocalDate(rightDate); + LocalDate left = Objects.requireNonNull(DateUtil.parseLocalDateTime(leftDate)).toLocalDate(); + LocalDate right = Objects.requireNonNull(DateUtil.parseLocalDateTime(rightDate)).toLocalDate(); return ChronoUnit.DAYS.between(right, left); } catch (Exception e) { return null; } } - - public LocalDate getLocalDate(String dateString) { - DateTimeFormatter formatter = null; - LocalDate dateTime = null; - if (dateString.indexOf(' ') != -1) { - formatter = DEFAULT_FORMAT_DATE_TIME; - dateTime = LocalDateTime.parse(dateString, formatter).toLocalDate(); - } else { - formatter = DEFAULT_FORMAT_DATE; - dateTime = LocalDate.parse(dateString, formatter); - } - return dateTime; - } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateSubFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateSubFunction.java index dfa14efee92..10336420460 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateSubFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateSubFunction.java @@ -21,15 +21,13 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo; import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; -import org.apache.commons.lang3.tuple.Pair; -import java.time.temporal.ChronoField; import java.util.List; -import java.util.Map; /** * DateAddFunction @@ -50,8 +48,8 @@ @TransformFunction(names = {"date_sub", "datesub"}) public class DateSubFunction implements ValueParser { - private ValueParser datetimeParser; - private ValueParser intervalParser; + private final ValueParser datetimeParser; + private final ValueParser intervalParser; public DateSubFunction(Function expr) { List expressions = expr.getParameters().getExpressions(); @@ -61,12 +59,12 @@ public DateSubFunction(Function expr) { @Override public Object parse(SourceData sourceData, int rowIndex, Context context) { - Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context); Object dateObj = datetimeParser.parse(sourceData, rowIndex, context); - if (intervalPairObj == null || dateObj == null) { + if (intervalInfoObj == null || dateObj == null) { return null; } - return DateUtil.dateAdd(OperatorTools.parseString(dateObj), - (Pair>) intervalPairObj, -1); + return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj), + (IntervalInfo) intervalInfoObj, false); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java index 29b4636d81d..c5e55f87253 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java @@ -21,30 +21,26 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.List; /** * TimestampAddFunction * Description: Add integer expression intervals to the date or date time expression expr. * The unit of the time interval is specified by the unit parameter, which should be one of the following values: - * FRAC_SECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. + * MICROSECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. */ @TransformFunction(names = {"timestamp_add", "timestampadd"}) public class TimestampAddFunction implements ValueParser { - private ValueParser intervalParser; - private ValueParser amountParser; - private ValueParser datetimeParser; - private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final ValueParser intervalParser; + private final ValueParser amountParser; + private final ValueParser datetimeParser; public TimestampAddFunction(Function expr) { List expressions = expr.getParameters().getExpressions(); @@ -62,22 +58,18 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { } private String evalDate(String dateString, String interval, Long amount) { - DateTimeFormatter formatter = null; - LocalDateTime dateTime = null; - boolean hasTime = true; - if (dateString.indexOf(' ') != -1) { - formatter = DEFAULT_FORMAT_DATE_TIME; - dateTime = LocalDateTime.parse(dateString, formatter); - } else { - formatter = DEFAULT_FORMAT_DATE; - dateTime = LocalDate.parse(dateString, formatter).atStartOfDay(); - hasTime = false; + LocalDateTime dateTime = DateUtil.parseLocalDateTime(dateString); + if (dateTime == null) { + return null; } + boolean hasTime = dateString.indexOf(' ') != -1; + boolean hasMicro = dateString.indexOf('.') != -1; switch (interval.toUpperCase()) { - case "FRAC_SECOND": + case "MICROSECOND": hasTime = true; - dateTime = dateTime.plusNanos(amount * 1000_000); + hasMicro = true; + dateTime = dateTime.plusNanos(amount * 1000); break; case "SECOND": hasTime = true; @@ -107,12 +99,13 @@ private String evalDate(String dateString, String interval, Long amount) { dateTime = dateTime.plusYears(amount); break; } - - String result = dateTime.toLocalDate().toString(); + StringBuilder format = new StringBuilder("yyyy-MM-dd"); if (hasTime) { - result += " " + dateTime.toLocalTime().format(DateTimeFormatter.ofPattern("HH:mm:ss")); + format.append(" HH:mm:ss"); } - - return result; + if (hasMicro) { + format.append(".SSSSSS"); + } + return dateTime.format(DateUtil.getDateTimeFormatter(format.toString())); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampDiffFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampDiffFunction.java index ebc045db22d..085659b4eda 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampDiffFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampDiffFunction.java @@ -21,15 +21,15 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Objects; /** * TimestampDiffFunction -> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) @@ -41,14 +41,9 @@ @TransformFunction(names = {"timestamp_diff", "timestampdiff"}) public class TimestampDiffFunction implements ValueParser { - private ValueParser unitParser; - private ValueParser firstDateTimeParser; - private ValueParser secondDateTimeParser; - private static final DateTimeFormatter DEFAULT_FORMAT_DATE_MRO_TIME = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final ValueParser unitParser; + private final ValueParser firstDateTimeParser; + private final ValueParser secondDateTimeParser; public TimestampDiffFunction(Function expr) { List expressions = expr.getParameters().getExpressions(); @@ -72,8 +67,8 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { return null; } try { - LocalDateTime left = getLocalDate(firstDateTime); - LocalDateTime right = getLocalDate(secondDateTime); + LocalDateTime left = Objects.requireNonNull(DateUtil.parseLocalDateTime(firstDateTime)); + LocalDateTime right = Objects.requireNonNull(DateUtil.parseLocalDateTime(secondDateTime)); switch (unit) { case "MICROSECOND": return ChronoUnit.MICROS.between(left, right); @@ -100,20 +95,4 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { return null; } } - - public LocalDateTime getLocalDate(String dateString) { - DateTimeFormatter formatter = null; - LocalDateTime dateTime = null; - if (dateString.indexOf('.') != -1) { - formatter = DEFAULT_FORMAT_DATE_MRO_TIME; - dateTime = LocalDateTime.parse(dateString, formatter); - } else if (dateString.indexOf(' ') != -1) { - formatter = DEFAULT_FORMAT_DATE_TIME; - dateTime = LocalDateTime.parse(dateString, formatter); - } else { - formatter = DEFAULT_FORMAT_DATE; - dateTime = LocalDate.parse(dateString, formatter).atStartOfDay(); - } - return dateTime; - } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java index eea6ce7e17e..54e2f0e8ce9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java @@ -20,17 +20,16 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo; import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.operators.arithmetic.Addition; -import org.apache.commons.lang3.tuple.Pair; import java.math.BigDecimal; -import java.time.temporal.ChronoField; -import java.util.Map; /** * AdditionParser + * Description: Support the addition of numerical values and time */ @TransformParser(values = Addition.class) public class AdditionParser implements ValueParser { @@ -49,8 +48,8 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { if (this.left instanceof IntervalParser && this.right instanceof IntervalParser) { return null; } else if (this.left instanceof IntervalParser || this.right instanceof IntervalParser) { - IntervalParser intervalParser = null; - ValueParser dateParser = null; + IntervalParser intervalParser; + ValueParser dateParser; if (this.left instanceof IntervalParser) { intervalParser = (IntervalParser) this.left; dateParser = this.right; @@ -58,13 +57,13 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { intervalParser = (IntervalParser) this.right; dateParser = this.left; } - Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context); Object dateObj = dateParser.parse(sourceData, rowIndex, context); - if (intervalPairObj == null || dateObj == null) { + if (intervalInfoObj == null || dateObj == null) { return null; } - return DateUtil.dateAdd(OperatorTools.parseString(dateObj), - (Pair>) intervalPairObj, 1); + return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj), + (IntervalInfo) intervalInfoObj, true); } else { return numericalOperation(sourceData, rowIndex, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java index 7266dcd63b8..a79e63ea2f2 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java @@ -20,22 +20,17 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.expression.IntervalExpression; -import org.apache.commons.lang3.tuple.Pair; import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoField; -import java.time.temporal.TemporalAccessor; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * IntervalParser <-> INTERVAL expr unit -> Pair(factor,Map(ChronoField,Count)): + * IntervalParser <-> INTERVAL expr unit -> IntervalInfo: *

* `factor`: *

@@ -62,11 +57,6 @@ public class IntervalParser implements ValueParser { private final String intervalType; private final ValueParser dateParser; private final String parameter; - - private static final List CHRONO_FIELD_LIST = Arrays.asList(ChronoField.YEAR, - ChronoField.MONTH_OF_YEAR, - ChronoField.DAY_OF_MONTH, ChronoField.HOUR_OF_DAY, ChronoField.MINUTE_OF_HOUR, ChronoField.SECOND_OF_MINUTE, - ChronoField.MICRO_OF_SECOND); private static final Map DT_FORMATTER_MAP = new ConcurrentHashMap<>(); static { @@ -105,13 +95,13 @@ public IntervalParser(IntervalExpression expr) { public Object parse(SourceData sourceData, int rowIndex, Context context) { DateTimeFormatter dateTimeFormatter = DT_FORMATTER_MAP.get(intervalType); - String dataStr = parameter; + String dateStr = parameter; if (dateParser != null) { Object dateObj = dateParser.parse(sourceData, rowIndex, context); if (dateObj == null) { return null; } - dataStr = OperatorTools.parseString(dateObj); + dateStr = OperatorTools.parseString(dateObj); } int factor = 1; @@ -128,24 +118,11 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { } try { - factor = dataStr.charAt(0) == '-' ? -factor : factor; + factor = dateStr.charAt(0) == '-' ? -factor : factor; if (factor < 0) { - dataStr = dataStr.substring(1); - } - TemporalAccessor temporalAccessor = dateTimeFormatter.parse(dataStr); - HashMap map = new HashMap<>(); - for (ChronoField field : CHRONO_FIELD_LIST) { - try { - long num = temporalAccessor.getLong(field); - if (num == 0) { - continue; - } - map.put(field, temporalAccessor.getLong(field)); - } catch (Exception ignored) { - - } + dateStr = dateStr.substring(1); } - return Pair.of(factor, map); + return DateUtil.parseInterInfo(dateTimeFormatter, dateStr, factor); } catch (Exception e) { log.error("Interval parse error", e); return null; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java index cf32f1694cd..900c5ff9464 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java @@ -20,24 +20,23 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo; import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; -import org.apache.commons.lang3.tuple.Pair; import java.math.BigDecimal; -import java.time.temporal.ChronoField; -import java.util.Map; /** * SubtractionParser + * Description: Support subtraction between numerical values and time */ @TransformParser(values = Subtraction.class) public class SubtractionParser implements ValueParser { - private ValueParser left; + private final ValueParser left; - private ValueParser right; + private final ValueParser right; public SubtractionParser(Subtraction expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -58,13 +57,13 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { intervalParser = (IntervalParser) this.right; dateParser = this.left; } - Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context); Object dateObj = dateParser.parse(sourceData, rowIndex, context); - if (intervalPairObj == null || dateObj == null) { + if (intervalInfoObj == null || dateObj == null) { return null; } - return DateUtil.dateAdd(OperatorTools.parseString(dateObj), - (Pair>) intervalPairObj, -1); + return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj), + (IntervalInfo) intervalInfoObj, false); } else { return numericalOperation(sourceData, rowIndex, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/pojo/IntervalInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/pojo/IntervalInfo.java new file mode 100644 index 00000000000..8158b7ee5a5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/pojo/IntervalInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.pojo; + +import lombok.Data; + +import java.time.temporal.ChronoField; +import java.util.HashMap; + +@Data +public class IntervalInfo { + + private int factor = 1; + private HashMap chronoMap; + + public IntervalInfo() { + } + + public IntervalInfo(HashMap chronoMap) { + this.chronoMap = chronoMap; + } + + public IntervalInfo(int factor, HashMap chronoMap) { + this.factor = factor; + this.chronoMap = chronoMap; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java index e703bb543f2..6a8a1bd9726 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java @@ -17,18 +17,27 @@ package org.apache.inlong.sdk.transform.process.utils; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; public class DateUtil { + private static final List CHRONO_FIELD_LIST = Arrays.asList(ChronoField.YEAR, + ChronoField.MONTH_OF_YEAR, + ChronoField.DAY_OF_MONTH, ChronoField.HOUR_OF_DAY, ChronoField.MINUTE_OF_HOUR, ChronoField.SECOND_OF_MINUTE, + ChronoField.MICRO_OF_SECOND); + // Need to follow this order private static final Map DATE_TIME_FORMATTER_MAP = new LinkedHashMap<>(); private static final Map TIME_FORMATTER_MAP = new LinkedHashMap<>(); @@ -51,27 +60,18 @@ public class DateUtil { * Time calculation * * @param dateStr Time parameter string - * @param intervalPair Interval parsing results - * @param sign If the sign is positive or negative, it indicates addition or subtraction + * @param intervalInfo Interval parsing results + * @param isPositive True is positive, false is negative * @return Calculation result string */ - public static String dateAdd(String dateStr, Pair> intervalPair, int sign) { - - if (sign < 0) { - sign = -1; - } else if (sign > 0) { - sign = 1; - } else { - return null; - } - + public static String dateTypeAdd(String dateStr, IntervalInfo intervalInfo, boolean isPositive) { Object dateParserObj = parseLocalDateTime(dateStr); if (dateParserObj != null) { - return addDateTime(intervalPair, sign, (LocalDateTime) dateParserObj, dateStr); + return addDateTime(dateStr, (LocalDateTime) dateParserObj, intervalInfo, isPositive); } dateParserObj = parseLocalTime(dateStr); if (dateParserObj != null) { - return addTime(intervalPair, sign, (LocalTime) dateParserObj, dateStr); + return addTime(dateStr, (LocalTime) dateParserObj, intervalInfo, isPositive); } return null; } @@ -117,6 +117,23 @@ public static LocalTime parseLocalTime(String dateStr) { return null; } + public static IntervalInfo parseInterInfo(DateTimeFormatter dateTimeFormatter, String dateStr, int factor) { + TemporalAccessor temporalAccessor = dateTimeFormatter.parse(dateStr); + HashMap map = new HashMap<>(); + for (ChronoField field : CHRONO_FIELD_LIST) { + try { + long num = temporalAccessor.getLong(field); + if (num == 0) { + continue; + } + map.put(field, temporalAccessor.getLong(field)); + } catch (Exception ignored) { + + } + } + return new IntervalInfo(factor, map); + } + public static DateTimeFormatter getDateTimeFormatter(String formatStr) { DateTimeFormatter formatter = DATE_TIME_FORMATTER_MAP.get(formatStr); if (formatter != null) { @@ -125,13 +142,23 @@ public static DateTimeFormatter getDateTimeFormatter(String formatStr) { return TIME_FORMATTER_MAP.get(formatStr); } - private static String addDateTime(Pair> intervalPair, int sign, - LocalDateTime dateTime, String dataStr) { - int factor = intervalPair.getKey(); - Map valueMap = intervalPair.getValue(); + /** + * + * @param dateStr The first time string + * @param dateTime It is obtained by parsing dateStr + * @param intervalInfo addend + * @param isPositive True is positive, false is negative + * @return + */ + public static String addDateTime(String dateStr, LocalDateTime dateTime, + IntervalInfo intervalInfo, boolean isPositive) { + int factor = intervalInfo.getFactor(); + Map valueMap = intervalInfo.getChronoMap(); + + int sign = isPositive ? 1 : -1; - boolean hasTime = dataStr.indexOf(' ') != -1; - boolean hasMicroSecond = dataStr.indexOf('.') != -1; + boolean hasTime = dateStr.indexOf(' ') != -1; + boolean hasMicroSecond = dateStr.indexOf('.') != -1; for (ChronoField field : valueMap.keySet()) { long amount = valueMap.get(field) * factor * sign; @@ -176,12 +203,21 @@ private static String addDateTime(Pair> interval return dateTime.toLocalDate().toString(); } - private static String addTime(Pair> intervalPair, int sign, LocalTime time, - String dataStr) { - int factor = intervalPair.getKey(); - Map valueMap = intervalPair.getValue(); + /** + * + * @param dateStr The first time string + * @param time It is obtained by parsing dateStr + * @param intervalInfo addend + * @param isPositive True is positive, false is negative + * @return + */ + public static String addTime(String dateStr, LocalTime time, + IntervalInfo intervalInfo, boolean isPositive) { + int factor = intervalInfo.getFactor(); + Map valueMap = intervalInfo.getChronoMap(); + boolean hasMicroSecond = dateStr.indexOf('.') != -1; - boolean hasMicroSecond = dataStr.indexOf('.') != -1; + int sign = isPositive ? 1 : -1; for (ChronoField field : valueMap.keySet()) { long amount = valueMap.get(field) * factor * sign; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAdd.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAddFunction.java similarity index 74% rename from inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAdd.java rename to inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAddFunction.java index c2f7e1ac56c..245ee050c5d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAdd.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampAddFunction.java @@ -28,10 +28,10 @@ import java.util.HashMap; import java.util.List; -public class TestTimestampAdd extends AbstractFunctionTemporalTestBase { +public class TestTimestampAddFunction extends AbstractFunctionTemporalTestBase { @Test - public void testTimestampAdd() throws Exception { + public void testTimestampAddFunction() throws Exception { String transformSql1 = "select timestamp_add('day',string2,string1) from source"; TransformConfig config1 = new TransformConfig(transformSql1); TransformProcessor processor1 = TransformProcessor @@ -67,5 +67,21 @@ public void testTimestampAdd() throws Exception { List output5 = processor2.transform("1970-01-01|-3", new HashMap<>()); Assert.assertEquals(1, output5.size()); Assert.assertEquals("result=1969-12-31 23:57:00", output5.get(0)); + + String transformSql3 = "select timestamp_add('MICROSECOND',string2,string1) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: timestamp_add('MICROSECOND',3,'1970-01-01 00:00:44.000001') + List output6 = processor3.transform("1970-01-01 00:00:44.000001|3", new HashMap<>()); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals("result=1970-01-01 00:00:44.000004", output6.get(0)); + + // case7: timestamp_add('MICROSECOND',3,'1970-01-01 00:00:44') + List output7 = processor3.transform("1970-01-01 00:00:44|3", new HashMap<>()); + Assert.assertEquals(1, output7.size()); + Assert.assertEquals("result=1970-01-01 00:00:44.000003", output7.get(0)); } }