Skip to content

Commit

Permalink
[INLONG-11192][SDK] Transform SQL supports TIMEDIFF function
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Sep 26, 2024
1 parent b670373 commit d376a1f
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
import org.apache.commons.lang3.tuple.Pair;

import java.time.temporal.ChronoField;
import java.util.List;
import java.util.Map;

/**
* DateAddFunction
Expand All @@ -50,8 +48,8 @@
@TransformFunction(names = {"date_add"})
public class DateAddFunction implements ValueParser {

private ValueParser datetimeParser;
private ValueParser intervalParser;
private final ValueParser datetimeParser;
private final ValueParser intervalParser;

public DateAddFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
Expand All @@ -61,12 +59,12 @@ public DateAddFunction(Function expr) {

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context);
Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context);
Object dateObj = datetimeParser.parse(sourceData, rowIndex, context);
if (intervalPairObj == null || dateObj == null) {
if (intervalInfoObj == null || dateObj == null) {
return null;
}
return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
(Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1);
return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj),
(IntervalInfo) intervalInfoObj, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;

/**
* DateDiffFunction
Expand All @@ -43,9 +43,6 @@ public class DateDiffFunction implements ValueParser {

private final ValueParser leftDateParser;
private final ValueParser rightDateParser;
private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd");

public DateDiffFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
Expand All @@ -66,24 +63,11 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return null;
}
try {
LocalDate left = getLocalDate(leftDate);
LocalDate right = getLocalDate(rightDate);
LocalDate left = Objects.requireNonNull(DateUtil.parseLocalDateTime(leftDate)).toLocalDate();
LocalDate right = Objects.requireNonNull(DateUtil.parseLocalDateTime(rightDate)).toLocalDate();
return ChronoUnit.DAYS.between(right, left);
} catch (Exception e) {
return null;
}
}

public LocalDate getLocalDate(String dateString) {
DateTimeFormatter formatter = null;
LocalDate dateTime = null;
if (dateString.indexOf(' ') != -1) {
formatter = DEFAULT_FORMAT_DATE_TIME;
dateTime = LocalDateTime.parse(dateString, formatter).toLocalDate();
} else {
formatter = DEFAULT_FORMAT_DATE;
dateTime = LocalDate.parse(dateString, formatter);
}
return dateTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
import org.apache.commons.lang3.tuple.Pair;

import java.time.temporal.ChronoField;
import java.util.List;
import java.util.Map;

/**
* DateAddFunction
Expand All @@ -50,8 +48,8 @@
@TransformFunction(names = {"date_sub", "datesub"})
public class DateSubFunction implements ValueParser {

private ValueParser datetimeParser;
private ValueParser intervalParser;
private final ValueParser datetimeParser;
private final ValueParser intervalParser;

public DateSubFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
Expand All @@ -61,12 +59,12 @@ public DateSubFunction(Function expr) {

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context);
Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context);
Object dateObj = datetimeParser.parse(sourceData, rowIndex, context);
if (intervalPairObj == null || dateObj == null) {
if (intervalInfoObj == null || dateObj == null) {
return null;
}
return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
(Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, -1);
return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj),
(IntervalInfo) intervalInfoObj, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,26 @@
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

/**
* TimestampAddFunction
* Description: Add integer expression intervals to the date or date time expression expr.
* The unit of the time interval is specified by the unit parameter, which should be one of the following values:
* FRAC_SECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR.
* MICROSECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR.
*/
@TransformFunction(names = {"timestamp_add", "timestampadd"})
public class TimestampAddFunction implements ValueParser {

private ValueParser intervalParser;
private ValueParser amountParser;
private ValueParser datetimeParser;
private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private final ValueParser intervalParser;
private final ValueParser amountParser;
private final ValueParser datetimeParser;

public TimestampAddFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
Expand All @@ -62,22 +58,18 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) {
}

private String evalDate(String dateString, String interval, Long amount) {
DateTimeFormatter formatter = null;
LocalDateTime dateTime = null;
boolean hasTime = true;
if (dateString.indexOf(' ') != -1) {
formatter = DEFAULT_FORMAT_DATE_TIME;
dateTime = LocalDateTime.parse(dateString, formatter);
} else {
formatter = DEFAULT_FORMAT_DATE;
dateTime = LocalDate.parse(dateString, formatter).atStartOfDay();
hasTime = false;
LocalDateTime dateTime = DateUtil.parseLocalDateTime(dateString);
if (dateTime == null) {
return null;
}
boolean hasTime = dateString.indexOf(' ') != -1;
boolean hasMicro = dateString.indexOf('.') != -1;

switch (interval.toUpperCase()) {
case "FRAC_SECOND":
case "MICROSECOND":
hasTime = true;
dateTime = dateTime.plusNanos(amount * 1000_000);
hasMicro = true;
dateTime = dateTime.plusNanos(amount * 1000);
break;
case "SECOND":
hasTime = true;
Expand Down Expand Up @@ -107,12 +99,13 @@ private String evalDate(String dateString, String interval, Long amount) {
dateTime = dateTime.plusYears(amount);
break;
}

String result = dateTime.toLocalDate().toString();
StringBuilder format = new StringBuilder("yyyy-MM-dd");
if (hasTime) {
result += " " + dateTime.toLocalTime().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
format.append(" HH:mm:ss");
}

return result;
if (hasMicro) {
format.append(".SSSSSS");
}
return dateTime.format(DateUtil.getDateTimeFormatter(format.toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;

/**
* TimestampDiffFunction -> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2)
Expand All @@ -41,14 +41,9 @@
@TransformFunction(names = {"timestamp_diff", "timestampdiff"})
public class TimestampDiffFunction implements ValueParser {

private ValueParser unitParser;
private ValueParser firstDateTimeParser;
private ValueParser secondDateTimeParser;
private static final DateTimeFormatter DEFAULT_FORMAT_DATE_MRO_TIME =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private final ValueParser unitParser;
private final ValueParser firstDateTimeParser;
private final ValueParser secondDateTimeParser;

public TimestampDiffFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
Expand All @@ -72,8 +67,8 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return null;
}
try {
LocalDateTime left = getLocalDate(firstDateTime);
LocalDateTime right = getLocalDate(secondDateTime);
LocalDateTime left = Objects.requireNonNull(DateUtil.parseLocalDateTime(firstDateTime));
LocalDateTime right = Objects.requireNonNull(DateUtil.parseLocalDateTime(secondDateTime));
switch (unit) {
case "MICROSECOND":
return ChronoUnit.MICROS.between(left, right);
Expand All @@ -100,20 +95,4 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return null;
}
}

public LocalDateTime getLocalDate(String dateString) {
DateTimeFormatter formatter = null;
LocalDateTime dateTime = null;
if (dateString.indexOf('.') != -1) {
formatter = DEFAULT_FORMAT_DATE_MRO_TIME;
dateTime = LocalDateTime.parse(dateString, formatter);
} else if (dateString.indexOf(' ') != -1) {
formatter = DEFAULT_FORMAT_DATE_TIME;
dateTime = LocalDateTime.parse(dateString, formatter);
} else {
formatter = DEFAULT_FORMAT_DATE;
dateTime = LocalDate.parse(dateString, formatter).atStartOfDay();
}
return dateTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.pojo.IntervalInfo;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
import org.apache.commons.lang3.tuple.Pair;

import java.math.BigDecimal;
import java.time.temporal.ChronoField;
import java.util.Map;

/**
* AdditionParser
* Description: Support the addition of numerical values and time
*/
@TransformParser(values = Addition.class)
public class AdditionParser implements ValueParser {
Expand All @@ -49,22 +48,22 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) {
if (this.left instanceof IntervalParser && this.right instanceof IntervalParser) {
return null;
} else if (this.left instanceof IntervalParser || this.right instanceof IntervalParser) {
IntervalParser intervalParser = null;
ValueParser dateParser = null;
IntervalParser intervalParser;
ValueParser dateParser;
if (this.left instanceof IntervalParser) {
intervalParser = (IntervalParser) this.left;
dateParser = this.right;
} else {
intervalParser = (IntervalParser) this.right;
dateParser = this.left;
}
Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context);
Object intervalInfoObj = intervalParser.parse(sourceData, rowIndex, context);
Object dateObj = dateParser.parse(sourceData, rowIndex, context);
if (intervalPairObj == null || dateObj == null) {
if (intervalInfoObj == null || dateObj == null) {
return null;
}
return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
(Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1);
return DateUtil.dateTypeAdd(OperatorTools.parseString(dateObj),
(IntervalInfo) intervalInfoObj, true);
} else {
return numericalOperation(sourceData, rowIndex, context);
}
Expand Down
Loading

0 comments on commit d376a1f

Please sign in to comment.