Skip to content

Commit

Permalink
[bugfix][plugin][kudu] Fix the timezone issue when writing and readin…
Browse files Browse the repository at this point in the history
…g timestamp type data. (#1177)

1. Fix the timezone issue when writing and reading timestamp type data.
2. Add support for `!=` operator
  • Loading branch information
wgzhao authored Nov 1, 2024
1 parent c05fc5a commit 4120791
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
7 changes: 3 additions & 4 deletions docs/reader/kudureader.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ bin/addax.sh job/kudu2stream.json
上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 `column operator value`

- `column`: 要过滤的字段
- `operator`: 比较符号,当前仅支持 `=`, `>`, '>=', `<`, `<=` , 其他操作符号当前还不支持
- `value`: 比较值,如果是字符串,可以加上单引号(`'`), 不加可以,因为实际类型会从数据库表中获取对应字段(`column`)的类型,但如果值含有空格,则一定要加上单引号
- `operator`: 比较符号,当前仅支持 `=`, `>`, `>=`, `<`, `<=` , `!=` 其他操作符号当前还不支持
- `value`: 比较值

这里还有其他一些限定,在使用时,要特别注意:

1. 上述三个部分之间至少有一个空格 `age>1`, `age >1` 这种均无效,这是因为我们实际上是把 SQL 风格的过滤提交转换为 Kudu 的 [KuduPredicate](https://kudu.apache.org/releases/1.14.0/apidocs/org/apache/kudu/client/KuduPredicate.html)
2. 多个过滤条件之间的逻辑与关系(`AND`),暂不支持逻辑或(`OR`)关系
1. 多个过滤条件之间的逻辑与关系(`AND`),暂不支持逻辑或(`OR`)关系

## 类型转换

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import com.wgzhao.addax.common.element.BoolColumn;
import com.wgzhao.addax.common.element.BytesColumn;
import com.wgzhao.addax.common.element.DateColumn;
import com.wgzhao.addax.common.element.DoubleColumn;
import com.wgzhao.addax.common.element.LongColumn;
import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.element.StringColumn;
import com.wgzhao.addax.common.element.TimestampColumn;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
Expand All @@ -44,14 +44,19 @@
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_DATE_FORMAT;
import static com.wgzhao.addax.common.base.Key.COLUMN;
import static com.wgzhao.addax.common.base.Key.WHERE;
import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.NOT_SUPPORT_TYPE;
import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;
Expand All @@ -75,7 +80,7 @@ public static class Job

private String upperBound;
// match where clause such as age > 18
private static final String PATTERN_FOR_WHERE = "^(\\w+)\\s+(=|>|>=|<|<=)\\s+(.*)$";
private static final String PATTERN_FOR_WHERE = "^\\s*(\\w+)\\s*(>=|<|<=|!=|=|>)\\s*(.*)$";
private static final Pattern pattern = Pattern.compile(PATTERN_FOR_WHERE);

@Override
Expand Down Expand Up @@ -306,7 +311,9 @@ public void startRead(RecordSender recordSender)
record.addColumn(new DoubleColumn(result.getDouble(columnSchema.getName())));
break;
case UNIXTIME_MICROS:
record.addColumn(new DateColumn(result.getTimestamp(columnSchema.getName())));
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
long ts = result.getLong(columnSchema.getName()) / 1_000L - offsetSecs * 1_000L;
record.addColumn(new TimestampColumn(ts));
break;
case DECIMAL:
record.addColumn(new DoubleColumn(result.getDecimal(columnSchema.getName())));
Expand Down Expand Up @@ -448,7 +455,16 @@ private List<KuduPredicate> processWhere(List<Configuration> where, Schema schem
predicate = KuduPredicate.newComparisonPredicate(column, op, value.getBytes(StandardCharsets.UTF_8));
break;
case UNIXTIME_MICROS:
predicate = KuduPredicate.newComparisonPredicate(column, op, Timestamp.valueOf(value));
SimpleDateFormat sdf = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
try {
java.util.Date date = sdf.parse(value);
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
long ts = date.getTime() * 1_000L + offsetSecs * 1_000_000L;
predicate = KuduPredicate.newComparisonPredicate(column, op, ts);
}
catch (ParseException e) {
throw AddaxException.asAddaxException(CONFIG_ERROR, "Can not parse date: " + value);
}
break;
default:
throw new IllegalStateException("Unexpected type: " + column.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -136,7 +138,10 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
row.addDecimal(name, new BigDecimal(column.asString()));
break;
case UNIXTIME_MICROS:
row.addTimestamp(name, new Timestamp(column.asLong()));
// need convert local timestamp to UTC
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
// use nanos timestamp value
row.addLong(name, (column.asTimestamp().getTime() * 1_000L + offsetSecs * 1_000_000L));
break;
case DATE:
// Kudu take date as string
Expand Down

0 comments on commit 4120791

Please sign in to comment.