Skip to content

Commit

Permalink
[Fix] fix the decimal precision of postgres transform (#3263)
Browse files Browse the repository at this point in the history
  • Loading branch information
javaht authored Mar 9, 2024
1 parent 24887a2 commit 1129b69
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void addSink(

protected List<Operation> createInsertOperations(
CustomTableEnvironment customTableEnvironment, Table table, String viewName, String tableName) {
String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName);
String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config);
logger.info(cdcSqlInsert);

List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.cdc.utils;

import org.dinky.data.model.Column;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.data.model.Table;
import org.dinky.utils.SqlUtil;
Expand All @@ -33,7 +34,7 @@ public class FlinkStatementUtil {

private FlinkStatementUtil() {}

public static String getCDCInsertSql(Table table, String targetName, String sourceName) {
public static String getCDCInsertSql(Table table, String targetName, String sourceName, FlinkCDCConfig config) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append("`").append(targetName).append("`");
sb.append(" SELECT\n");
Expand All @@ -42,15 +43,26 @@ public static String getCDCInsertSql(Table table, String targetName, String sour
if (i > 0) {
sb.append(",");
}
sb.append(String.format("`%s`", table.getColumns().get(i).getName()))
.append(" \n");
sb.append(getColumnProcessing(table.getColumns().get(i), config)).append(" \n");
}
sb.append(" FROM `");
sb.append(sourceName);
sb.append("`");
return sb.toString();
}

public static String getColumnProcessing(Column column, FlinkCDCConfig config) {
String configType = config.getType();
String columnType = column.getType();
if (configType.contains("postgres-cdc")
&& (columnType.contains("numeric") || columnType.contains("decimal"))
&& column.getPrecision().intValue() > 38) {
return " CAST(" + column.getName() + " AS STRING) AS `" + column.getName() + "`";
} else {
return String.format("`%s`", column.getName());
}
}

public static String getFlinkDDL(
Table table,
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
package org.dinky.metadata.convert;

import org.dinky.data.enums.ColumnType;
import org.dinky.data.model.Column;
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.config.DriverConfig;

import java.util.Optional;

/**
* PostgreSqlTypeConvert
Expand All @@ -44,8 +49,8 @@ public PostgreSqlTypeConvert() {
register("float4", ColumnType.FLOAT, ColumnType.JAVA_LANG_FLOAT);
register("float8", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE);
register("double precision", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE);
register("numeric", ColumnType.DECIMAL);
register("decimal", ColumnType.DECIMAL);
register("numeric", PostgreSqlTypeConvert::convertDecimalOrNumeric);
register("decimal", PostgreSqlTypeConvert::convertDecimalOrNumeric);
register("boolean", ColumnType.BOOLEAN, ColumnType.JAVA_LANG_BOOLEAN);
register("bool", ColumnType.BOOLEAN, ColumnType.JAVA_LANG_BOOLEAN);
register("timestamp", ColumnType.TIMESTAMP);
Expand All @@ -57,4 +62,14 @@ public PostgreSqlTypeConvert() {
register("jsonb", ColumnType.STRING);
register("json", ColumnType.STRING);
}

private static Optional<ColumnType> convertDecimalOrNumeric(
Column column, DriverConfig<AbstractJdbcConfig> driverConfig) {
// 该字段的精度
int intValue = column.getPrecision().intValue();
if (intValue > 38) {
return Optional.of(ColumnType.STRING);
}
return Optional.of(ColumnType.DECIMAL);
}
}

0 comments on commit 1129b69

Please sign in to comment.