Skip to content

Commit

Permalink
[Optimization-3146][CDCSOURCE] Use spotless to format the code
Browse files Browse the repository at this point in the history
  • Loading branch information
kindbgen committed Feb 7, 2024
1 parent 257b002 commit 4921d79
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public String getType() {
return type;
}

private static final Map<String, DataBaseType> MAP = Arrays.stream(values()).collect(Collectors.toMap(DataBaseType::getType, Function.identity()));
private static final Map<String, DataBaseType> MAP =
Arrays.stream(values()).collect(Collectors.toMap(DataBaseType::getType, Function.identity()));

public static DataBaseType get(String type) {
return MAP.get(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@

package org.dinky.cdc.debezium;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;

/**
* 处理 Debezium 源库 时间转换的问题
* Debezium 默认将源库中 datetime 类型转成 UTC 的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改,
Expand Down Expand Up @@ -63,7 +65,6 @@ public class DebeziumCustomConverter implements CustomConverter<SchemaBuilder, R
// 获取默认时区
protected final ZoneId zoneId = ZoneOffset.systemDefault();


@Override
public void configure(Properties properties) {
// 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserver、oracle、postgresql
Expand Down Expand Up @@ -95,7 +96,8 @@ public void configure(Properties properties) {
}

@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
public void converterFor(
RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
schemaBuilder = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.dinky.cdc.debezium.converter;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.RelationalColumn;
import org.dinky.cdc.debezium.DebeziumCustomConverter;

import java.time.Instant;
import java.time.ZoneOffset;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.spi.converter.RelationalColumn;

/**
* @author <a href="mailto:[email protected]">Kindbgen<a/>
* @description Mysql 转换器
Expand All @@ -34,7 +36,8 @@
public class MysqlDebeziumConverter extends DebeziumCustomConverter {

@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
public void converterFor(
RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
// 获取字段类型
String columnType = relationalColumn.typeName().toUpperCase();
this.registerConverter(columnType, converterRegistration);
Expand Down Expand Up @@ -63,9 +66,7 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
return null;
} else if (value instanceof java.time.Duration) {
return timeFormatter.format(
java.time.LocalTime.
ofNanoOfDay(((java.time.Duration) value)
.toNanos()));
java.time.LocalTime.ofNanoOfDay(((java.time.Duration) value).toNanos()));
} else if (value instanceof java.time.LocalDateTime) {
return datetimeFormatter.format((java.time.LocalDateTime) value);
} else {
Expand All @@ -81,7 +82,9 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
} else if (value instanceof java.time.LocalDateTime) {
return datetimeFormatter.format((java.time.LocalDateTime) value);
} else if (value instanceof java.time.ZonedDateTime) {
return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneId).toLocalDateTime());
return datetimeFormatter.format(((java.time.ZonedDateTime) value)
.withZoneSameInstant(zoneId)
.toLocalDateTime());
} else if (value instanceof java.sql.Timestamp) {
return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
} else if (value instanceof String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.dinky.cdc.debezium.converter;

import org.dinky.cdc.debezium.DebeziumCustomConverter;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.spi.converter.RelationalColumn;
import org.dinky.cdc.debezium.DebeziumCustomConverter;

/**
* @author <a href="mailto:[email protected]">Kindbgen<a/>
Expand All @@ -31,7 +33,8 @@
public class OracleDebeziumConverter extends DebeziumCustomConverter {

@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
public void converterFor(
RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
// 获取字段类型
String columnType = relationalColumn.typeName().toUpperCase();
this.registerConverter(columnType, converterRegistration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.dinky.cdc.debezium.converter;

import org.dinky.cdc.debezium.DebeziumCustomConverter;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.spi.converter.RelationalColumn;
import org.dinky.cdc.debezium.DebeziumCustomConverter;

/**
* @author <a href="mailto:[email protected]">Kindbgen<a/>
Expand All @@ -30,7 +32,8 @@
*/
public class PostgresDebeziumConverter extends DebeziumCustomConverter {
@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
public void converterFor(
RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
// 获取字段类型
String columnType = relationalColumn.typeName().toUpperCase();
this.registerConverter(columnType, converterRegistration);
Expand Down Expand Up @@ -84,5 +87,4 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
break;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@

package org.dinky.cdc.debezium.converter;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.RelationalColumn;
import org.dinky.cdc.debezium.DebeziumCustomConverter;

import java.time.ZoneOffset;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.spi.converter.RelationalColumn;

/**
* @author <a href="mailto:[email protected]">Kindbgen<a/>
* @description SqlServer 转换器
* @date 2024/2/6
*/
public class SqlServerDebeziumConverter extends DebeziumCustomConverter {
@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
public void converterFor(
RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
// 获取字段类型
String columnType = relationalColumn.typeName().toUpperCase();
this.registerConverter(columnType, converterRegistration);
Expand Down Expand Up @@ -62,7 +65,8 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
} else if (value instanceof java.sql.Time) {
return timeFormatter.format(((java.sql.Time) value).toLocalTime());
} else if (value instanceof java.sql.Timestamp) {
return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());
return timeFormatter.format(
((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());
} else if (value instanceof java.time.LocalDateTime) {
return datetimeFormatter.format((java.time.LocalDateTime) value);
} else {
Expand All @@ -81,7 +85,10 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
} else if (value instanceof microsoft.sql.DateTimeOffset) {
microsoft.sql.DateTimeOffset dateTimeOffset = (microsoft.sql.DateTimeOffset) value;
return datetimeFormatter.format(dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime());
return datetimeFormatter.format(dateTimeOffset
.getOffsetDateTime()
.withOffsetSameInstant(ZoneOffset.UTC)
.toLocalDateTime());
} else if (value instanceof java.time.LocalDateTime) {
return datetimeFormatter.format((java.time.LocalDateTime) value);
} else {
Expand All @@ -94,5 +101,4 @@ public void registerConverter(String columnType, ConverterRegistration<SchemaBui
break;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,4 @@ public static void checkNullMap(Map<?, ?> map, String msg) {
public static boolean isContainsString(String str1, String str2) {
return !isNullString(str1) && str1.contains(str2);
}

}
1 change: 1 addition & 0 deletions dinky-common/src/main/java/org/dinky/data/model/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class Table implements Serializable, Comparable<Table>, Cloneable {

/** 驱动类型, @see org.dinky.metadata.enums.DriverType */
private String driverType;

public Table() {}

public Table(String name, String schema, List<Column> columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
* @since 2024/2/6
*/
public enum DriverType {

MYSQL("MySQL"),
ORACLE("Oracle"),
POSTGRESQL("PostgreSql"),
Expand All @@ -53,7 +52,8 @@ public String getValue() {
return value;
}

private static final Map<String, DriverType> MAP = Arrays.stream(values()).collect(Collectors.toMap(DriverType::getValue, Function.identity()));
private static final Map<String, DriverType> MAP =
Arrays.stream(values()).collect(Collectors.toMap(DriverType::getValue, Function.identity()));

public static DriverType get(String value) {
return MAP.get(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public class MysqlType {
private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL";
private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED";
private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
"DOUBLE PRECISION UNSIGNED ZEROFILL";
private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = "DOUBLE PRECISION UNSIGNED ZEROFILL";
private static final String NUMERIC = "NUMERIC";
private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL";
Expand Down Expand Up @@ -144,20 +143,15 @@ public static String toDorisType(String type, Integer length, Integer scale) {
case DECIMAL_UNSIGNED_ZEROFILL:
return length != null && length <= 38
? String.format(
"%s(%s,%s)",
DorisType.DECIMAL_V3,
length,
scale != null && scale >= 0 ? scale : 0)
"%s(%s,%s)", DorisType.DECIMAL_V3, length, scale != null && scale >= 0 ? scale : 0)
: DorisType.STRING;
case DATE:
return DorisType.DATE_V2;
case DATETIME:
case TIMESTAMP:
// default precision is 0
// see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
if (length == null
|| length <= 0
|| length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
if (length == null || length <= 0 || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
} else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
// Timestamp with a fraction of seconds.
Expand All @@ -174,20 +168,15 @@ public static String toDorisType(String type, Integer length, Integer scale) {
// For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9.
return String.format(
"%s(%s)",
DorisType.DATETIME_V2,
Math.min(length, DorisConstant.MAX_SUPPORTED_DATE_TIME_PRECISION));
DorisType.DATETIME_V2, Math.min(length, DorisConstant.MAX_SUPPORTED_DATE_TIME_PRECISION));
} else {
throw new UnsupportedOperationException(
"Unsupported length: "
+ length
+ " for MySQL TIMESTAMP/DATETIME types");
"Unsupported length: " + length + " for MySQL TIMESTAMP/DATETIME types");
}
case CHAR:
case VARCHAR:
Asserts.checkNotNull(length, "VARCHAR length is null");
return length * 3 > 65533
? DorisType.STRING
: String.format("%s(%s)", DorisType.VARCHAR, length * 3);
return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3);
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public static String toDorisType(String oracleType, Integer precision, Integer s
}
return precision != null && precision <= 38
? String.format(
"%s(%s,%s)",
DorisType.DECIMAL_V3,
precision,
scale != null && scale >= 0 ? scale : 0)
"%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
: DorisType.STRING;
case FLOAT:
return DorisType.DOUBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,15 @@ public static String toDorisType(String postgresType, Integer precision, Integer
case NUMERIC:
return precision != null && precision > 0 && precision <= 38
? String.format(
"%s(%s,%s)",
DorisType.DECIMAL_V3,
precision,
scale != null && scale >= 0 ? scale : 0)
"%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
: DorisType.STRING;
case FLOAT4:
return DorisType.FLOAT;
case FLOAT8:
return DorisType.DOUBLE;
case TIMESTAMP:
case TIMESTAMPTZ:
return String.format(
"%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
case DATE:
return DorisType.DATE_V2;
case BOOL:
Expand Down Expand Up @@ -154,8 +150,7 @@ public static String toDorisType(String postgresType, Integer precision, Integer
return String.format("%s<%s>", DorisType.ARRAY, DorisType.DATETIME_V2);
**/
default:
throw new UnsupportedOperationException(
"Unsupported Postgres Type: " + postgresType);
throw new UnsupportedOperationException("Unsupported Postgres Type: " + postgresType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,14 @@ public static String toDorisType(String originSqlServerType, Integer precision,
case NUMERIC:
return precision != null && precision > 0 && precision <= 38
? String.format(
"%s(%s,%s)",
DorisType.DECIMAL_V3,
precision,
scale != null && scale >= 0 ? scale : 0)
"%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
: DorisType.STRING;
case DATE:
return DorisType.DATE_V2;
case DATETIME:
case DATETIME2:
case SMALLDATETIME:
return String.format(
"%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
case CHAR:
case VARCHAR:
case NCHAR:
Expand All @@ -108,8 +104,7 @@ public static String toDorisType(String originSqlServerType, Integer precision,
case VARBINARY:
return DorisType.STRING;
default:
throw new UnsupportedOperationException(
"Unsupported SqlServer Type: " + sqlServerType);
throw new UnsupportedOperationException("Unsupported SqlServer Type: " + sqlServerType);
}
}
}
Loading

0 comments on commit 4921d79

Please sign in to comment.