Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix][plugin][kudu] Fix the timezone issue #1177

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/reader/kudureader.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ bin/addax.sh job/kudu2stream.json
上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 `column operator value`

- `column`: 要过滤的字段
- `operator`: 比较符号,当前仅支持 `=`, `>`, '>=', `<`, `<=` , 其他操作符号当前还不支持
- `value`: 比较值,如果是字符串,可以加上单引号(`'`), 不加可以,因为实际类型会从数据库表中获取对应字段(`column`)的类型,但如果值含有空格,则一定要加上单引号
- `operator`: 比较符号,当前仅支持 `=`, `>`, `>=`, `<`, `<=` , `!=` 其他操作符号当前还不支持
- `value`: 比较值

这里还有其他一些限定,在使用时,要特别注意:

1. 上述三个部分之间至少有一个空格 `age>1`, `age >1` 这种均无效,这是因为我们实际上是把 SQL 风格的过滤提交转换为 Kudu 的 [KuduPredicate](https://kudu.apache.org/releases/1.14.0/apidocs/org/apache/kudu/client/KuduPredicate.html) 类
2. 多个过滤条件之间的逻辑与关系(`AND`),暂不支持逻辑或(`OR`)关系
1. 多个过滤条件之间的逻辑与关系(`AND`),暂不支持逻辑或(`OR`)关系

## 类型转换

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import com.wgzhao.addax.common.element.BoolColumn;
import com.wgzhao.addax.common.element.BytesColumn;
import com.wgzhao.addax.common.element.DateColumn;
import com.wgzhao.addax.common.element.DoubleColumn;
import com.wgzhao.addax.common.element.LongColumn;
import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.element.StringColumn;
import com.wgzhao.addax.common.element.TimestampColumn;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
Expand All @@ -44,14 +44,19 @@
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_DATE_FORMAT;
import static com.wgzhao.addax.common.base.Key.COLUMN;
import static com.wgzhao.addax.common.base.Key.WHERE;
import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.NOT_SUPPORT_TYPE;
import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;
Expand All @@ -75,7 +80,7 @@ public static class Job

private String upperBound;
// match where clause such as age > 18
private static final String PATTERN_FOR_WHERE = "^(\\w+)\\s+(=|>|>=|<|<=)\\s+(.*)$";
private static final String PATTERN_FOR_WHERE = "^\\s*(\\w+)\\s*(>=|<|<=|!=|=|>)\\s*(.*)$";
private static final Pattern pattern = Pattern.compile(PATTERN_FOR_WHERE);

@Override
Expand Down Expand Up @@ -306,7 +311,9 @@ public void startRead(RecordSender recordSender)
record.addColumn(new DoubleColumn(result.getDouble(columnSchema.getName())));
break;
case UNIXTIME_MICROS:
record.addColumn(new DateColumn(result.getTimestamp(columnSchema.getName())));
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
long ts = result.getLong(columnSchema.getName()) / 1_000L - offsetSecs * 1_000L;
record.addColumn(new TimestampColumn(ts));
break;
case DECIMAL:
record.addColumn(new DoubleColumn(result.getDecimal(columnSchema.getName())));
Expand Down Expand Up @@ -448,7 +455,16 @@ private List<KuduPredicate> processWhere(List<Configuration> where, Schema schem
predicate = KuduPredicate.newComparisonPredicate(column, op, value.getBytes(StandardCharsets.UTF_8));
break;
case UNIXTIME_MICROS:
predicate = KuduPredicate.newComparisonPredicate(column, op, Timestamp.valueOf(value));
SimpleDateFormat sdf = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
try {
java.util.Date date = sdf.parse(value);
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
long ts = date.getTime() * 1_000L + offsetSecs * 1_000_000L;
predicate = KuduPredicate.newComparisonPredicate(column, op, ts);
}
catch (ParseException e) {
throw AddaxException.asAddaxException(CONFIG_ERROR, "Can not parse date: " + value);
}
break;
default:
throw new IllegalStateException("Unexpected type: " + column.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -136,7 +138,10 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
row.addDecimal(name, new BigDecimal(column.asString()));
break;
case UNIXTIME_MICROS:
row.addTimestamp(name, new Timestamp(column.asLong()));
// need convert local timestamp to UTC
int offsetSecs = ZonedDateTime.now(ZoneId.systemDefault()).getOffset().getTotalSeconds();
// use nanos timestamp value
row.addLong(name, (column.asTimestamp().getTime() * 1_000L + offsetSecs * 1_000_000L));
break;
case DATE:
// Kudu take date as string
Expand Down