diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java index e7a1c8c2c5e..51a5195d077 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.client; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; @@ -42,10 +43,10 @@ public class StarRocksSinkManager { private int batchRowCount = 0; private long batchBytesSize = 0; - public StarRocksSinkManager(SinkConfig sinkConfig, List fileNames) { + public StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema) { this.sinkConfig = sinkConfig; this.batchList = new ArrayList<>(); - starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames); + starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, tableSchema); } private void tryInit() throws IOException { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java index d004213b7a8..c0283399a03 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -17,12 +17,15 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.client; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP; import org.apache.commons.codec.binary.Base64; @@ -56,11 +59,11 @@ public class StarRocksStreamLoadVisitor { private static final String RESULT_LABEL_ABORTED = "ABORTED"; private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; - private List fieldNames; + private final TableSchema tableSchema; - public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List fieldNames) { + public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, TableSchema tableSchema) { this.sinkConfig = sinkConfig; - this.fieldNames = fieldNames; + this.tableSchema = tableSchema; this.httpHelper = new HttpHelper(sinkConfig); } @@ -260,16 +263,19 @@ private String getBasicAuthHeader(String username, String password) { private Map getStreamLoadHttpHeader(String label) { Map headerMap = new HashMap<>(); - if (null != fieldNames - && !fieldNames.isEmpty() + List columns = tableSchema.getColumns(); + List fieldNames = + columns.stream().map(Column::getName).collect(Collectors.toList()); + if (sinkConfig.isEnableUpsertDelete()) { + fieldNames.add(StarRocksSinkOP.COLUMN_KEY); + } + if (!fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { headerMap.put( "columns", - String.join( - ",", - fieldNames.stream() - .map(f -> String.format("`%s`", f)) - .collect(Collectors.toList()))); + fieldNames.stream() + .map(f -> String.format("`%s`", f)) + .collect(Collectors.joining(","))); } if (null != sinkConfig.getStreamLoadProps()) { for (Map.Entry entry : sinkConfig.getStreamLoadProps().entrySet()) { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksType.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksType.java new file mode 100644 index 00000000000..56511e64948 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksType.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class StarRocksType { + public static final String SR_NULL = "NULL"; + public static final String SR_BOOLEAN = "BOOLEAN"; + public static final String SR_TINYINT = "TINYINT"; + public static final String SR_SMALLINT = "SMALLINT"; + public static final String SR_INT = "INT"; + public static final String SR_BIGINT = "BIGINT"; + public static final String SR_LARGEINT = "LARGEINT"; + public static final String SR_FLOAT = "FLOAT"; + public static final String SR_DOUBLE = "DOUBLE"; + public static final String SR_DECIMAL = "DECIMAL"; + public static final String SR_DECIMALV3 = "DECIMALV3"; + public static final String SR_DATE = "DATE"; + public static final String SR_DATETIME = "DATETIME"; + public static final String SR_CHAR = "CHAR"; + public static final String SR_VARCHAR = "VARCHAR"; + public static final String SR_STRING = "STRING"; + + public static final String SR_BOOLEAN_ARRAY = "ARRAY"; + public static final String SR_TINYINT_ARRAY = "ARRAY"; + public static final String SR_SMALLINT_ARRAY = "ARRAY"; + public static final String SR_INT_ARRAY = "ARRAY"; + public static final String SR_BIGINT_ARRAY = "ARRAY"; + public static final String SR_FLOAT_ARRAY = "ARRAY"; + public static final String SR_DOUBLE_ARRAY = "ARRAY"; + public static final String SR_DECIMALV3_ARRAY = "ARRAY"; + public static final String SR_DECIMALV3_ARRAY_COLUMN_TYPE_TMP = "ARRAY"; + public static final String SR_DATEV2_ARRAY = "ARRAY"; + public static final String SR_DATETIMEV2_ARRAY = "ARRAY"; + public static final String SR_STRING_ARRAY = "ARRAY"; + + // Because can not get the column length from array, So the following types of arrays cannot be + // generated properly. + public static final String SR_LARGEINT_ARRAY = "ARRAY"; + public static final String SR_CHAR_ARRAY = "ARRAY"; + public static final String SR_CHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY"; + public static final String SR_VARCHAR_ARRAY = "ARRAY"; + public static final String SR_VARCHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY"; + + public static final String SR_JSON = "JSON"; + public static final String SR_JSONB = "JSONB"; + + public static final String SR_ARRAY = "ARRAY"; + + public static final String SR_ARRAY_BOOLEAN_INTER = "tinyint(1)"; + public static final String SR_ARRAY_TINYINT_INTER = "tinyint(4)"; + public static final String SR_ARRAY_SMALLINT_INTER = "smallint(6)"; + public static final String SR_ARRAY_INT_INTER = "int(11)"; + public static final String SR_ARRAY_BIGINT_INTER = "bigint(20)"; + public static final String SR_ARRAY_DECIMAL_PRE = "DECIMAL"; + public static final String SR_ARRAY_DATE_INTER = "date"; + public static final String SR_ARRAY_DATEV2_INTER = "DATEV2"; + public static final String SR_ARRAY_DATETIME_INTER = "DATETIME"; + public static final String SR_ARRAY_DATETIMEV2_INTER = "DATETIMEV2"; + + public static final String SR_MAP = "MAP"; + public static final String SR_MAP_COLUMN_TYPE = "MAP<%s, %s>"; + + public static final String SR_BOOLEAN_INDENTFIER = "TINYINT(1)"; + + private String type; +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksTypeConverter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksTypeConverter.java new file mode 100644 index 00000000000..146f7eeb8bb --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/datatypes/StarRocksTypeConverter.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalArrayType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.exception.CommonError; + +import org.apache.commons.lang3.StringUtils; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.Locale; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_BIGINT_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_BOOLEAN_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_DATETIMEV2_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_DATETIME_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_DATEV2_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_DATE_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_DECIMAL_PRE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_INT_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_SMALLINT_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_ARRAY_TINYINT_INTER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_BIGINT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_BIGINT_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_BOOLEAN; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_BOOLEAN_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_BOOLEAN_INDENTFIER; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_CHAR; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DATE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DATETIME; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DATETIMEV2_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DATEV2_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DECIMAL; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DECIMALV3; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DECIMALV3_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DECIMALV3_ARRAY_COLUMN_TYPE_TMP; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DOUBLE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_DOUBLE_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_FLOAT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_FLOAT_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_INT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_INT_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_JSON; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_LARGEINT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_MAP; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_MAP_COLUMN_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_NULL; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_SMALLINT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_SMALLINT_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_STRING; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_STRING_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_TINYINT; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_TINYINT_ARRAY; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType.SR_VARCHAR; + +/** Starrocks type converter for catalog. */ +@Slf4j +@AutoService(TypeConverter.class) +public class StarRocksTypeConverter implements TypeConverter> { + + public static final long MAX_STRING_LENGTH = 2147483643; + public static final Long MAX_PRECISION = 38L; + public static final Integer MAX_SCALE = 10; + public static final Integer MAX_DATETIME_SCALE = 6; + public static final long POWER_2_8 = (long) Math.pow(2, 8); + + public static final StarRocksTypeConverter INSTANCE = new StarRocksTypeConverter(); + + @Override + public String identifier() { + return "StarRocks"; + } + + @Override + public Column convert(BasicTypeDefine typeDefine) { + PhysicalColumn.PhysicalColumnBuilder builder = + PhysicalColumn.builder() + .name(typeDefine.getName()) + .sourceType(typeDefine.getColumnType()) + .nullable(typeDefine.isNullable()) + .defaultValue(typeDefine.getDefaultValue()) + .comment(typeDefine.getComment()); + String type = getOriginalType(typeDefine); + + switch (type) { + case SR_NULL: + builder.dataType(BasicType.VOID_TYPE); + break; + case SR_BOOLEAN: + builder.dataType(BasicType.BOOLEAN_TYPE); + break; + case SR_TINYINT: + if (SR_BOOLEAN_INDENTFIER.equalsIgnoreCase(typeDefine.getColumnType())) { + builder.dataType(BasicType.BOOLEAN_TYPE); + } else { + builder.dataType(BasicType.BYTE_TYPE); + } + break; + case SR_SMALLINT: + builder.dataType(BasicType.SHORT_TYPE); + break; + case SR_INT: + builder.dataType(BasicType.INT_TYPE); + break; + case SR_BIGINT: + builder.dataType(BasicType.LONG_TYPE); + break; + case SR_LARGEINT: + DecimalType decimalType; + decimalType = new DecimalType(20, 0); + builder.dataType(decimalType); + builder.columnLength(20L); + builder.scale(0); + break; + case SR_FLOAT: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case SR_DOUBLE: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case SR_DECIMAL: + case SR_DECIMALV3: + setDecimalType(builder, typeDefine); + break; + case SR_CHAR: + case SR_VARCHAR: + if (typeDefine.getLength() != null && typeDefine.getLength() > 0) { + builder.columnLength(typeDefine.getLength()); + } + builder.dataType(BasicType.STRING_TYPE); + break; + case SR_STRING: + case SR_JSON: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(MAX_STRING_LENGTH); + break; + case SR_DATE: + builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); + break; + case SR_DATETIME: + builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); + builder.scale(typeDefine.getScale() == null ? 0 : typeDefine.getScale()); + break; + case SR_ARRAY: + convertArray(typeDefine.getColumnType(), builder, typeDefine.getName()); + break; + case SR_MAP: + convertMap(typeDefine.getColumnType(), builder, typeDefine.getName()); + break; + default: + throw CommonError.convertToSeaTunnelTypeError( + identifier(), typeDefine.getColumnType(), typeDefine.getName()); + } + return builder.build(); + } + + @Override + public BasicTypeDefine reconvert(Column column) { + BasicTypeDefine.BasicTypeDefineBuilder builder = + BasicTypeDefine.builder() + .name(column.getName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .defaultValue(column.getDefaultValue()); + switch (column.getDataType().getSqlType()) { + case NULL: + builder.columnType(SR_NULL); + builder.dataType(SR_NULL); + break; + case BYTES: + builder.columnType(SR_STRING); + builder.dataType(SR_STRING); + break; + case BOOLEAN: + builder.columnType(SR_BOOLEAN); + builder.dataType(SR_BOOLEAN); + builder.length(1L); + break; + case TINYINT: + builder.columnType(SR_TINYINT); + builder.dataType(SR_TINYINT); + break; + case SMALLINT: + builder.columnType(SR_SMALLINT); + builder.dataType(SR_SMALLINT); + break; + case INT: + builder.columnType(SR_INT); + builder.dataType(SR_INT); + break; + case BIGINT: + builder.columnType(SR_BIGINT); + builder.dataType(SR_BIGINT); + break; + case FLOAT: + builder.columnType(SR_FLOAT); + builder.dataType(SR_FLOAT); + break; + case DOUBLE: + builder.columnType(SR_DOUBLE); + builder.dataType(SR_DOUBLE); + break; + case DECIMAL: + // DORIS LARGEINT + if (column.getSourceType() != null + && column.getSourceType().equalsIgnoreCase(SR_LARGEINT)) { + builder.dataType(SR_LARGEINT); + builder.columnType(SR_LARGEINT); + break; + } + DecimalType decimalType = (DecimalType) column.getDataType(); + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + if (precision <= 0) { + precision = MAX_PRECISION.intValue(); + scale = MAX_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is precision less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (precision > MAX_PRECISION) { + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum precision of {}, " + + "it will be converted to varchar(200)", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_PRECISION); + builder.dataType(SR_VARCHAR); + builder.columnType(String.format("%s(%s)", SR_VARCHAR, 200)); + break; + } + + if (scale < 0) { + scale = 0; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is scale less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (scale > precision) { + scale = precision; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + precision, + scale); + } + builder.columnType(String.format("%s(%s,%s)", SR_DECIMALV3, precision, scale)); + builder.dataType(SR_DECIMALV3); + builder.precision((long) precision); + builder.scale(scale); + break; + case TIME: + builder.length(8L); + builder.columnType(String.format("%s(%s)", SR_VARCHAR, 8)); + builder.dataType(SR_VARCHAR); + break; + case ARRAY: + SeaTunnelDataType dataType = column.getDataType(); + SeaTunnelDataType elementType = null; + if (dataType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) dataType; + elementType = arrayType.getElementType(); + } + reconvertBuildArrayInternal(elementType, builder, column.getName()); + break; + case ROW: + builder.columnType(SR_JSON); + builder.dataType(SR_JSON); + break; + case STRING: + reconvertString(column, builder); + break; + case DATE: + builder.columnType(SR_DATE); + builder.dataType(SR_DATE); + break; + case TIMESTAMP: + builder.columnType(SR_DATETIME); + builder.dataType(SR_DATETIME); + break; + case MAP: + reconvertMap(column, builder); + break; + default: + throw CommonError.convertToConnectorTypeError( + identifier(), column.getDataType().getSqlType().name(), column.getName()); + } + + return builder.build(); + } + + private void setDecimalType( + PhysicalColumn.PhysicalColumnBuilder builder, + BasicTypeDefine typeDefine) { + Long p = 10L; + int scale = 0; + if (typeDefine.getPrecision() != null && typeDefine.getPrecision() > 0) { + p = typeDefine.getPrecision(); + } + + if (typeDefine.getScale() != null && typeDefine.getScale() > 0) { + scale = typeDefine.getScale(); + } + DecimalType decimalType; + decimalType = new DecimalType(p.intValue(), scale); + builder.dataType(decimalType); + builder.columnLength(p); + builder.scale(scale); + } + + private void convertArray( + String columnType, PhysicalColumn.PhysicalColumnBuilder builder, String name) { + String columnInterType = extractArrayType(columnType); + if (columnInterType.equalsIgnoreCase(SR_ARRAY_BOOLEAN_INTER)) { + builder.dataType(ArrayType.BOOLEAN_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_TINYINT_INTER)) { + builder.dataType(ArrayType.BYTE_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_SMALLINT_INTER)) { + builder.dataType(ArrayType.SHORT_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_INT_INTER)) { + builder.dataType(ArrayType.INT_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_BIGINT_INTER)) { + builder.dataType(ArrayType.LONG_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_FLOAT)) { + builder.dataType(ArrayType.FLOAT_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_DOUBLE)) { + builder.dataType(ArrayType.DOUBLE_ARRAY_TYPE); + } else if (columnInterType.toUpperCase(Locale.ROOT).startsWith("CHAR") + || columnInterType.toUpperCase(Locale.ROOT).startsWith("VARCHAR") + || columnInterType.equalsIgnoreCase(SR_STRING)) { + builder.dataType(ArrayType.STRING_ARRAY_TYPE); + } else if (columnInterType.toUpperCase(Locale.ROOT).startsWith(SR_ARRAY_DECIMAL_PRE)) { + int[] precisionAndScale = getPrecisionAndScale(columnInterType); + DecimalArrayType decimalArray = + new DecimalArrayType( + new DecimalType(precisionAndScale[0], precisionAndScale[1])); + builder.dataType(decimalArray); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_DATE_INTER) + || columnInterType.equalsIgnoreCase(SR_ARRAY_DATEV2_INTER)) { + builder.dataType(ArrayType.LOCAL_DATE_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_ARRAY_DATETIME_INTER) + || columnInterType.equalsIgnoreCase(SR_ARRAY_DATETIMEV2_INTER)) { + builder.dataType(ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE); + } else if (columnInterType.equalsIgnoreCase(SR_LARGEINT)) { + DecimalArrayType decimalArray = new DecimalArrayType(new DecimalType(20, 0)); + builder.dataType(decimalArray); + } else { + throw CommonError.convertToSeaTunnelTypeError(identifier(), columnType, name); + } + } + + private static String extractArrayType(String input) { + Pattern pattern = Pattern.compile("<(.*?)>"); + Matcher matcher = pattern.matcher(input); + + return matcher.find() ? matcher.group(1) : ""; + } + + private void convertMap( + String columnType, PhysicalColumn.PhysicalColumnBuilder builder, String name) { + String[] keyValueType = + Optional.ofNullable(extractMapKeyValueType(columnType)) + .orElseThrow( + () -> + new IllegalArgumentException( + "Invalid map type: " + columnType)); + MapType mapType = + new MapType( + turnColumnTypeToSeaTunnelType(keyValueType[0], name + ".key"), + turnColumnTypeToSeaTunnelType(keyValueType[1], name + ".value")); + builder.dataType(mapType); + } + + private static String[] extractMapKeyValueType(String input) { + String[] result = new String[2]; + input = input.replaceAll("map<", "").replaceAll("MAP<", "").replaceAll(">", ""); + String[] split = input.split(","); + if (split.length == 4) { + // decimal(10,2),decimal(10,2) + result[0] = split[0] + "," + split[1]; + result[1] = split[2] + "," + split[3]; + } else if (split.length == 3) { + // decimal(10,2), date + // decimal(10, 2), varchar(20) + if (split[0].contains("(") && split[1].contains(")")) { + result[0] = split[0] + "," + split[1]; + result[1] = split[2]; + } else if (split[1].contains("(") && split[2].contains(")")) { + // date, decimal(10, 2) + // varchar(20), decimal(10, 2) + result[0] = split[0]; + result[1] = split[1] + "," + split[2]; + } else { + return null; + } + } else if (split.length == 2) { + result[0] = split[0]; + result[1] = split[1]; + } else { + return null; + } + return result; + } + + private SeaTunnelDataType turnColumnTypeToSeaTunnelType(String columnType, String columnName) { + BasicTypeDefine keyBasicTypeDefine = + BasicTypeDefine.builder() + .columnType(columnType) + .name(columnName) + .build(); + if (columnType.toUpperCase(Locale.ROOT).startsWith(SR_ARRAY_DECIMAL_PRE)) { + int[] precisionAndScale = getPrecisionAndScale(columnType); + keyBasicTypeDefine.setPrecision((long) precisionAndScale[0]); + keyBasicTypeDefine.setScale(precisionAndScale[1]); + } + Column column = convert(keyBasicTypeDefine); + return column.getDataType(); + } + + private String getOriginalType(BasicTypeDefine typeDefine) { + String columnType = typeDefine.getColumnType().toUpperCase(Locale.ROOT); + if (StringUtils.isBlank(columnType)) { + throw new IllegalArgumentException("Column type is empty."); + } + + if (columnType.contains("<") && columnType.contains(">")) { + return columnType.substring(0, columnType.indexOf("<")); + } + + if (columnType.contains("(") && columnType.contains(")")) { + return columnType.substring(0, columnType.indexOf("(")); + } + + return columnType; + } + + private static int[] getPrecisionAndScale(String decimalTypeDefinition) { + // Remove the "DECIMALV3" part and the parentheses + decimalTypeDefinition = decimalTypeDefinition.toUpperCase(Locale.ROOT); + String numericPart = decimalTypeDefinition.replace("DECIMALV3(", "").replace(")", ""); + numericPart = numericPart.replace("DECIMAL(", "").replace(")", ""); + + // Split by comma to separate precision and scale + String[] parts = numericPart.split(","); + + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid DECIMAL definition: " + decimalTypeDefinition); + } + + // Parse precision and scale from the split parts + int precision = Integer.parseInt(parts[0].trim()); + int scale = Integer.parseInt(parts[1].trim()); + + // Return an array containing precision and scale + return new int[] {precision, scale}; + } + + private void reconvertBuildArrayInternal( + SeaTunnelDataType elementType, + BasicTypeDefine.BasicTypeDefineBuilder builder, + String columnName) { + switch (elementType.getSqlType()) { + case BOOLEAN: + builder.columnType(SR_BOOLEAN_ARRAY); + builder.dataType(SR_BOOLEAN_ARRAY); + break; + case TINYINT: + builder.columnType(SR_TINYINT_ARRAY); + builder.dataType(SR_TINYINT_ARRAY); + break; + case SMALLINT: + builder.columnType(SR_SMALLINT_ARRAY); + builder.dataType(SR_SMALLINT_ARRAY); + break; + case INT: + builder.columnType(SR_INT_ARRAY); + builder.dataType(SR_INT_ARRAY); + break; + case BIGINT: + builder.columnType(SR_BIGINT_ARRAY); + builder.dataType(SR_BIGINT_ARRAY); + break; + case FLOAT: + builder.columnType(SR_FLOAT_ARRAY); + builder.dataType(SR_FLOAT_ARRAY); + break; + case DOUBLE: + builder.columnType(SR_DOUBLE_ARRAY); + builder.dataType(SR_DOUBLE_ARRAY); + break; + case DECIMAL: + int[] precisionAndScale = getPrecisionAndScale(elementType.toString()); + builder.columnType( + String.format( + SR_DECIMALV3_ARRAY_COLUMN_TYPE_TMP, + precisionAndScale[0], + precisionAndScale[1])); + builder.dataType(SR_DECIMALV3_ARRAY); + break; + case STRING: + case TIME: + builder.columnType(SR_STRING_ARRAY); + builder.dataType(SR_STRING_ARRAY); + break; + case DATE: + builder.columnType(SR_DATEV2_ARRAY); + builder.dataType(SR_DATEV2_ARRAY); + break; + case TIMESTAMP: + builder.columnType(SR_DATETIMEV2_ARRAY); + builder.dataType(SR_DATETIMEV2_ARRAY); + break; + default: + throw CommonError.convertToConnectorTypeError( + identifier(), elementType.getSqlType().name(), columnName); + } + } + + private void reconvertString( + Column column, BasicTypeDefine.BasicTypeDefineBuilder builder) { + // source is doris too. + if (column.getSourceType() != null && column.getSourceType().equalsIgnoreCase(SR_JSON)) { + // Compatible with Doris 1.x and Doris 2.x versions + builder.columnType(SR_JSON); + builder.dataType(SR_JSON); + return; + } + + sampleReconvertString(column, builder); + } + + protected void sampleReconvertString( + Column column, BasicTypeDefine.BasicTypeDefineBuilder builder) { + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.columnType(SR_STRING); + builder.dataType(SR_STRING); + return; + } + + if (column.getColumnLength() < POWER_2_8) { + if (column.getSourceType() != null + && column.getSourceType().toUpperCase(Locale.ROOT).startsWith(SR_VARCHAR)) { + builder.columnType(String.format("%s(%s)", SR_VARCHAR, column.getColumnLength())); + builder.dataType(SR_VARCHAR); + } else { + builder.columnType(String.format("%s(%s)", SR_CHAR, column.getColumnLength())); + builder.dataType(SR_CHAR); + } + return; + } + + if (column.getColumnLength() <= 65533) { + builder.columnType(String.format("%s(%s)", SR_VARCHAR, column.getColumnLength())); + builder.dataType(SR_VARCHAR); + return; + } + + if (column.getColumnLength() <= MAX_STRING_LENGTH) { + builder.columnType(SR_STRING); + builder.dataType(SR_STRING); + return; + } + + log.warn( + String.format( + "The String type in StarRocks can only store up to 2GB bytes, and the current field [%s] length is [%s] bytes. If it is greater than the maximum length of the String in Doris, it may not be able to write data", + column.getName(), column.getColumnLength())); + builder.columnType(SR_STRING); + builder.dataType(SR_STRING); + } + + private void reconvertMap( + Column column, BasicTypeDefine.BasicTypeDefineBuilder builder) { + MapType dataType = (MapType) column.getDataType(); + SeaTunnelDataType keyType = dataType.getKeyType(); + SeaTunnelDataType valueType = dataType.getValueType(); + Column keyColumn = + PhysicalColumn.of( + column.getName() + ".key", + (SeaTunnelDataType) keyType, + (Long) null, + true, + null, + null); + String keyColumnType = reconvert(keyColumn).getColumnType(); + + Column valueColumn = + PhysicalColumn.of( + column.getName() + ".value", + (SeaTunnelDataType) valueType, + (Long) null, + true, + null, + null); + String valueColumnType = reconvert(valueColumn).getColumnType(); + + builder.dataType(String.format(SR_MAP_COLUMN_TYPE, keyColumnType, valueColumnType)); + builder.columnType(String.format(SR_MAP_COLUMN_TYPE, keyColumnType, valueColumnType)); + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index 61f279fc126..6008738cc3b 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -17,39 +17,41 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.SchemaChangeType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog; import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; +import java.util.Arrays; +import java.util.List; import java.util.Optional; public class StarRocksSink extends AbstractSimpleSink - implements SupportSaveMode { + implements SupportSaveMode, SupportSchemaEvolutionSink { - private final SeaTunnelRowType seaTunnelRowType; + private final TableSchema tableSchema; private final SinkConfig sinkConfig; private final DataSaveMode dataSaveMode; private final SchemaSaveMode schemaSaveMode; private final CatalogTable catalogTable; - public StarRocksSink( - SinkConfig sinkConfig, CatalogTable catalogTable, ReadonlyConfig readonlyConfig) { + public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) { this.sinkConfig = sinkConfig; - this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); + this.tableSchema = catalogTable.getTableSchema(); this.catalogTable = catalogTable; this.dataSaveMode = sinkConfig.getDataSaveMode(); this.schemaSaveMode = sinkConfig.getSchemaSaveMode(); @@ -62,7 +64,8 @@ public String getPluginName() { @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) { - return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType); + TablePath sinkTablePath = catalogTable.getTablePath(); + return new StarRocksSinkWriter(sinkConfig, tableSchema, sinkTablePath); } @Override @@ -93,4 +96,13 @@ public Optional getSaveModeHandler() { public Optional getWriteCatalogTable() { return Optional.of(catalogTable); } + + @Override + public List supports() { + return Arrays.asList( + SchemaChangeType.ADD_COLUMN, + SchemaChangeType.DROP_COLUMN, + SchemaChangeType.RENAME_COLUMN, + SchemaChangeType.UPDATE_COLUMN); + } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index 51f7486569b..bc851a91ed3 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -100,6 +100,6 @@ public TableSink createSink(TableSinkFactoryContext context) { catalogTable.getPartitionKeys(), catalogTable.getCatalogName()); - return () -> new StarRocksSink(sinkConfig, finalCatalogTable, context.getOptions()); + return () -> new StarRocksSink(sinkConfig, finalCatalogTable); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index 06f70d09f90..d3b616c50c5 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -17,9 +17,23 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; +import org.apache.seatunnel.api.event.EventType; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; @@ -27,31 +41,39 @@ import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer; -import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP; +import org.apache.seatunnel.connectors.seatunnel.starrocks.util.SchemaUtils; + +import org.apache.commons.lang3.StringUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.Arrays; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; @Slf4j -public class StarRocksSinkWriter extends AbstractSinkWriter { +public class StarRocksSinkWriter extends AbstractSinkWriter + implements SupportSchemaEvolutionSinkWriter { - private final StarRocksISerializer serializer; - private final StarRocksSinkManager manager; + private StarRocksISerializer serializer; + private StarRocksSinkManager manager; + private TableSchema tableSchema; + private final SinkConfig sinkConfig; + private TablePath sinkTablePath; - public StarRocksSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { - List fieldNames = - Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList()); - if (sinkConfig.isEnableUpsertDelete()) { - fieldNames.add(StarRocksSinkOP.COLUMN_KEY); - } + public StarRocksSinkWriter( + SinkConfig sinkConfig, TableSchema tableSchema, TablePath tablePath) { + this.tableSchema = tableSchema; + SeaTunnelRowType seaTunnelRowType = tableSchema.toPhysicalRowDataType(); this.serializer = createSerializer(sinkConfig, seaTunnelRowType); - this.manager = new StarRocksSinkManager(sinkConfig, fieldNames); + this.manager = new StarRocksSinkManager(sinkConfig, tableSchema); + this.sinkConfig = sinkConfig; + this.sinkTablePath = tablePath; } @Override @@ -68,6 +90,100 @@ record = serializer.serialize(element); manager.write(record); } + @Override + public void applySchemaChange(SchemaChangeEvent event) throws IOException { + if (event instanceof AlterTableColumnsEvent) { + AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent) event; + List events = alterTableColumnsEvent.getEvents(); + for (AlterTableColumnEvent alterTableColumnEvent : events) { + String sourceDialectName = alterTableColumnEvent.getSourceDialectName(); + if (StringUtils.isBlank(sourceDialectName)) { + throw new SeaTunnelException( + "The sourceDialectName in AlterTableColumnEvent can not be empty. event: " + + event); + } + processSchemaChangeEvent(alterTableColumnEvent); + } + } else { + log.warn("We only support AlterTableColumnsEvent, but actual event is " + event); + } + + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + try { + Connection conn = + DriverManager.getConnection( + sinkConfig.getJdbcUrl(), + sinkConfig.getUsername(), + sinkConfig.getPassword()); + SchemaUtils.applySchemaChange(event, conn, sinkTablePath); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed connecting to %s via JDBC.", sinkConfig.getJdbcUrl()), e); + } + } + + protected void processSchemaChangeEvent(AlterTableColumnEvent event) throws IOException { + List columns = new ArrayList<>(tableSchema.getColumns()); + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + AlterTableAddColumnEvent alterTableAddColumnEvent = + (AlterTableAddColumnEvent) event; + Column addColumn = alterTableAddColumnEvent.getColumn(); + String afterColumn = alterTableAddColumnEvent.getAfterColumn(); + if (StringUtils.isNotBlank(afterColumn)) { + Optional columnOptional = + columns.stream() + .filter(column -> afterColumn.equals(column.getName())) + .findFirst(); + if (!columnOptional.isPresent()) { + columns.add(addColumn); + break; + } + columnOptional.ifPresent( + column -> { + int index = columns.indexOf(column); + columns.add(index + 1, addColumn); + }); + } else { + columns.add(addColumn); + } + break; + case SCHEMA_CHANGE_DROP_COLUMN: + String dropColumn = ((AlterTableDropColumnEvent) event).getColumn(); + columns.removeIf(column -> column.getName().equalsIgnoreCase(dropColumn)); + break; + case SCHEMA_CHANGE_MODIFY_COLUMN: + Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn(); + replaceColumnByIndex( + event.getEventType(), columns, modifyColumn.getName(), modifyColumn); + break; + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + (AlterTableChangeColumnEvent) event; + Column changeColumn = alterTableChangeColumnEvent.getColumn(); + String oldColumnName = alterTableChangeColumnEvent.getOldColumn(); + replaceColumnByIndex(event.getEventType(), columns, oldColumnName, changeColumn); + break; + default: + throw new SeaTunnelException( + "Unsupported schemaChangeEvent for event type: " + event.getEventType()); + } + this.tableSchema = + TableSchema.builder() + .columns(columns) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKey(tableSchema.getConstraintKeys()) + .build(); + SeaTunnelRowType seaTunnelRowType = tableSchema.toPhysicalRowDataType(); + this.serializer = createSerializer(sinkConfig, seaTunnelRowType); + this.manager = new StarRocksSinkManager(sinkConfig, tableSchema); + } + @SneakyThrows @Override public Optional prepareCommit() { @@ -89,7 +205,7 @@ public void close() throws IOException { } } - public static StarRocksISerializer createSerializer( + public StarRocksISerializer createSerializer( SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { return new StarRocksCsvSerializer( @@ -104,4 +220,20 @@ public static StarRocksISerializer createSerializer( CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Failed to create row serializer, unsupported `format` from stream load properties."); } + + protected void replaceColumnByIndex( + EventType eventType, List columns, String oldColumnName, Column newColumn) { + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (column.getName().equalsIgnoreCase(oldColumnName)) { + // rename ...... to ...... which just has column name + if (eventType.equals(EventType.SCHEMA_CHANGE_CHANGE_COLUMN) + && newColumn.getDataType() == null) { + columns.set(i, column.rename(newColumn.getName())); + } else { + columns.set(i, newColumn); + } + } + } + } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java new file mode 100644 index 00000000000..ac16fd40fe7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.util; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType; +import org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksTypeConverter; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +@Slf4j +public class SchemaUtils { + + private SchemaUtils() {} + + /** + * Refresh physical table schema by schema change event + * + * @param event schema change event + * @param connection jdbc connection + * @param tablePath sink table path + */ + public static void applySchemaChange( + SchemaChangeEvent event, Connection connection, TablePath tablePath) + throws SQLException { + if (event instanceof AlterTableColumnsEvent) { + for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent) event).getEvents()) { + applySchemaChange(columnEvent, connection, tablePath); + } + } else { + if (event instanceof AlterTableChangeColumnEvent) { + AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event; + if (!changeColumnEvent + .getOldColumn() + .equals(changeColumnEvent.getColumn().getName())) { + if (!columnExists(connection, tablePath, changeColumnEvent.getOldColumn()) + && columnExists( + connection, + tablePath, + changeColumnEvent.getColumn().getName())) { + log.warn( + "Column {} already exists in table {}. Skipping change column operation. event: {}", + changeColumnEvent.getColumn().getName(), + tablePath.getFullName(), + event); + return; + } + } + applySchemaChange(connection, tablePath, changeColumnEvent); + } else if (event instanceof AlterTableModifyColumnEvent) { + applySchemaChange(connection, tablePath, (AlterTableModifyColumnEvent) event); + } else if (event instanceof AlterTableAddColumnEvent) { + AlterTableAddColumnEvent addColumnEvent = (AlterTableAddColumnEvent) event; + if (columnExists(connection, tablePath, addColumnEvent.getColumn().getName())) { + log.warn( + "Column {} already exists in table {}. Skipping add column operation. event: {}", + addColumnEvent.getColumn().getName(), + tablePath.getFullName(), + event); + return; + } + applySchemaChange(connection, tablePath, addColumnEvent); + } else if (event instanceof AlterTableDropColumnEvent) { + AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event; + if (!columnExists(connection, tablePath, dropColumnEvent.getColumn())) { + log.warn( + "Column {} does not exist in table {}. Skipping drop column operation. event: {}", + dropColumnEvent.getColumn(), + tablePath.getFullName(), + event); + return; + } + applySchemaChange(connection, tablePath, dropColumnEvent); + } else { + throw new SeaTunnelException( + "Unsupported schemaChangeEvent : " + event.getEventType()); + } + } + } + + public static void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableChangeColumnEvent event) + throws SQLException { + String tableIdentifierWithQuoted = tablePath.getFullName(); + Column changeColumn = event.getColumn(); + String oldColumnName = event.getOldColumn(); + String afterColumn = event.getAfterColumn(); + String changeColumnSQL = + buildAlterTableSql( + AlterType.RENAME.name(), + changeColumn, + tableIdentifierWithQuoted, + oldColumnName, + afterColumn); + try (Statement statement = connection.createStatement()) { + log.info("Executing change column SQL: " + changeColumnSQL); + statement.execute(changeColumnSQL); + } + } + + public static void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableModifyColumnEvent event) + throws SQLException { + String tableIdentifierWithQuoted = tablePath.getFullName(); + Column modifyColumn = event.getColumn(); + String afterColumn = event.getAfterColumn(); + String modifyColumnSQL = + buildAlterTableSql( + AlterType.MODIFY.name(), + modifyColumn, + tableIdentifierWithQuoted, + StringUtils.EMPTY, + afterColumn); + + try (Statement statement = connection.createStatement()) { + log.info("Executing modify column SQL: " + modifyColumnSQL); + statement.execute(modifyColumnSQL); + } + } + + public static void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableAddColumnEvent event) + throws SQLException { + String tableIdentifierWithQuoted = tablePath.getFullName(); + Column addColumn = event.getColumn(); + String afterColumn = event.getAfterColumn(); + String addColumnSQL = + buildAlterTableSql( + AlterType.ADD.name(), + addColumn, + tableIdentifierWithQuoted, + StringUtils.EMPTY, + afterColumn); + try (Statement statement = connection.createStatement()) { + log.info("Executing add column SQL: " + addColumnSQL); + statement.execute(addColumnSQL); + } + } + + public static void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableDropColumnEvent event) + throws SQLException { + String tableIdentifierWithQuoted = tablePath.getFullName(); + String dropColumn = event.getColumn(); + String dropColumnSQL = + buildAlterTableSql( + AlterType.DROP.name(), null, tableIdentifierWithQuoted, dropColumn, null); + try (Statement statement = connection.createStatement()) { + log.info("Executing drop column SQL: " + dropColumnSQL); + statement.execute(dropColumnSQL); + } + } + + /** + * build alter table sql + * + * @param alterOperation alter operation of ddl + * @param newColumn new column after ddl + * @param tableName table name of sink table + * @param oldColumnName old column name before ddl + * @param afterColumn column before the new column + * @return alter table sql for sink table after schema change + */ + public static String buildAlterTableSql( + String alterOperation, + Column newColumn, + String tableName, + String oldColumnName, + String afterColumn) { + if (StringUtils.equals(alterOperation, AlterType.DROP.name())) { + return String.format( + "ALTER TABLE %s DROP COLUMN %s", tableName, quoteIdentifier(oldColumnName)); + } + + if (alterOperation.equalsIgnoreCase(AlterType.RENAME.name())) { + return String.format( + "ALTER TABLE %s RENAME COLUMN %s TO %s", + tableName, oldColumnName, newColumn.getName()); + } + + BasicTypeDefine typeDefine = + StarRocksTypeConverter.INSTANCE.reconvert(newColumn); + String basicSql = buildAlterTableBasicSql(alterOperation, tableName); + basicSql = decorateWithColumnNameAndType(basicSql, newColumn, typeDefine.getColumnType()); + basicSql = decorateWithComment(basicSql, newColumn.getComment()); + basicSql = decorateWithAfterColumn(basicSql, afterColumn); + return basicSql + ";"; + } + + /** + * Check if the column exists in the table + * + * @param connection + * @param tablePath + * @param column + * @return + */ + public static boolean columnExists(Connection connection, TablePath tablePath, String column) { + String selectColumnSQL = + String.format( + "SELECT %s FROM %s WHERE 1 != 1", + quoteIdentifier(column), tablePath.getTableName()); + try (Statement statement = connection.createStatement()) { + return statement.execute(selectColumnSQL); + } catch (SQLException e) { + log.debug("Column {} does not exist in table {}", column, tablePath.getFullName(), e); + return false; + } + } + + /** + * decorate the sql with column name and type + * + * @param basicSql basic sql of alter table for sink table + * @param newColumn new column after ddl + * @param columnType column type of new column + * @return basic sql with column name and type of alter table for sink table + */ + public static String decorateWithColumnNameAndType( + String basicSql, Column newColumn, String columnType) { + StringBuilder sql = new StringBuilder(basicSql); + String newColumnNameWithQuoted = quoteIdentifier(newColumn.getName()); + sql.append(newColumnNameWithQuoted).append(StringUtils.SPACE); + sql.append(columnType).append(StringUtils.SPACE); + return sql.toString(); + } + + /** + * build the body of alter table sql + * + * @param alterOperation alter operation of ddl + * @param tableName table name of sink table + * @return basic sql of alter table for sink table + */ + public static String buildAlterTableBasicSql(String alterOperation, String tableName) { + StringBuilder sql = + new StringBuilder( + "ALTER TABLE " + + tableName + + StringUtils.SPACE + + alterOperation + + StringUtils.SPACE + + "COLUMN" + + StringUtils.SPACE); + return sql.toString(); + } + + /** + * decorate with comment + * + * @param basicSql alter table sql for sink table + * @param comment comment of new column + * @return alter table sql with comment for sink table + */ + public static String decorateWithComment(String basicSql, String comment) { + StringBuilder sql = new StringBuilder(basicSql); + if (StringUtils.isNotBlank(comment)) { + sql.append("COMMENT '").append(comment).append("'"); + } + return sql.toString(); + } + + /** + * decorate with after + * + * @param basicSql alter table sql for sink table + * @param afterColumn column before the new column + * @return alter table sql with after for sink table + */ + public static String decorateWithAfterColumn(String basicSql, String afterColumn) { + StringBuilder sql = new StringBuilder(basicSql); + if (StringUtils.isNotBlank(afterColumn)) { + sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE); + } + return sql.toString(); + } + + public static String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + enum AlterType { + ADD, + DROP, + MODIFY, + RENAME + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml index 38e4f7eabee..7f11b08d8b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml @@ -48,5 +48,25 @@ ${mysql.version} test + + org.apache.seatunnel + connector-cdc-mysql + ${project.version} + test-jar + test + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + + org.testcontainers + mysql + ${testcontainer.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java new file mode 100644 index 00000000000..6207f673aa9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.starrocks; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.given; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") +public class StarRocksSchemaChangeIT extends TestSuiteBase implements TestResource { + private static final String DATABASE = "shop"; + private static final String SOURCE_TABLE = "products"; + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + + private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.4"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String HOST = "starrocks_cdc_e2e"; + private static final int SR_PROXY_PORT = 8080; + private static final int QUERY_PORT = 9030; + private static final int HTTP_PORT = 8030; + private static final int BE_HTTP_PORT = 8040; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String SINK_TABLE = "products"; + private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE; + private static final String SR_DRIVER_JAR = + "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + + private Connection starRocksConnection; + private Connection mysqlConnection; + private GenericContainer starRocksServer; + + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private static final String QUERY = "select * from %s.%s order by id"; + private static final String QUERY_COLUMNS = + "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER by COLUMN_NAME;"; + private static final String PROJECTION_QUERY = + "select id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s order by id;"; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase shopDatabase = + new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", "mysqlpw", DATABASE); + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + SR_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private void initializeJdbcConnection() throws Exception { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(SR_DRIVER_JAR)}, + StarRocksCDCSinkIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + starRocksConnection = + driver.connect( + String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), QUERY_PORT), + props); + } + + private void initializeStarRocksServer() { + starRocksServer = + new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))); + starRocksServer.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", QUERY_PORT, QUERY_PORT), + String.format("%s:%s", HTTP_PORT, HTTP_PORT), + String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT))); + Startables.deepStart(Stream.of(starRocksServer)).join(); + log.info("StarRocks container started"); + // wait for starrocks fully start + given().ignoreExceptions() + .await() + .atMost(360, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + } + + @TestTemplate + public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) + throws InterruptedException, IOException { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf"; + CompletableFuture.runAsync( + () -> { + try { + container.executeJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(20); + // waiting for case1 completed + assertSchemaEvolutionForAddColumns( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + + // savepoint 1 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case2 drop columns with cdc data at same time + shopDatabase.setTemplateName("drop_columns").createAndInitialize(); + + // restore 1 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case2 completed + assertTableStructureAndData( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + + // savepoint 2 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case3 change column name with cdc data at same time + shopDatabase.setTemplateName("change_columns").createAndInitialize(); + + // case4 modify column data type with cdc data at same time + shopDatabase.setTemplateName("modify_columns").createAndInitialize(); + + // restore 2 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case3/case4 completed + assertTableStructureAndData( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + } + + private void assertSchemaEvolutionForAddColumns( + String database, + String sourceTable, + String sinkTable, + Connection sourceConnection, + Connection sinkConnection) { + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, database, sourceTable), + sourceConnection), + query( + String.format(QUERY, database, sinkTable), + sinkConnection))); + + // case1 add columns with cdc data at same time + shopDatabase.setTemplateName("add_columns").createAndInitialize(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY_COLUMNS, database, sourceTable), + sourceConnection), + query( + String.format(QUERY_COLUMNS, database, sinkTable), + sinkConnection))); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format( + QUERY.replaceAll( + "order by id", + "where id >= 128 order by id"), + database, + sourceTable), + sourceConnection), + query( + String.format( + QUERY.replaceAll( + "order by id", + "where id >= 128 order by id"), + database, + sinkTable), + sinkConnection)); + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format(PROJECTION_QUERY, database, sourceTable), + sourceConnection), + query( + String.format(PROJECTION_QUERY, database, sinkTable), + sinkConnection)); + }); + } + + private void assertTableStructureAndData( + String database, + String sourceTable, + String sinkTable, + Connection sourceConnection, + Connection sinkConnection) { + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY_COLUMNS, database, sourceTable), + sourceConnection), + query( + String.format(QUERY_COLUMNS, database, sinkTable), + sinkConnection))); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, database, sourceTable), + sourceConnection), + query( + String.format(QUERY, database, sinkTable), + sinkConnection))); + } + + private Connection getMysqlJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeAll + @Override + public void startUp() throws SQLException { + initializeStarRocksServer(); + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + shopDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + initializeJdbcTable(); + mysqlConnection = getMysqlJdbcConnection(); + } + + @AfterAll + @Override + public void tearDown() throws SQLException { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + if (starRocksServer != null) { + starRocksServer.close(); + } + if (starRocksConnection != null) { + starRocksConnection.close(); + } + if (mysqlConnection != null) { + mysqlConnection.close(); + } + } + + private void initializeJdbcTable() { + try (Statement statement = starRocksConnection.createStatement()) { + // create databases + statement.execute(CREATE_DATABASE); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private List> query(String sql, Connection connection) { + try { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + if (resultSet.getObject(i) instanceof Timestamp) { + Timestamp timestamp = resultSet.getTimestamp(i); + objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER)); + break; + } + if (resultSet.getObject(i) instanceof LocalDateTime) { + LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class); + objects.add(localDateTime.format(DATE_TIME_FORMATTER)); + break; + } + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/add_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/add_columns.sql new file mode 100644 index 00000000000..2a1212aa95d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/add_columns.sql @@ -0,0 +1,83 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; +INSERT INTO products +VALUES (110,"scooter","Small 2-wheel scooter",3.14), + (111,"car battery","12V car battery",8.1), + (112,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (113,"hammer","12oz carpenter's hammer",0.75), + (114,"hammer","14oz carpenter's hammer",0.875), + (115,"hammer","16oz carpenter's hammer",1.0), + (116,"rocks","box of assorted rocks",5.3), + (117,"jacket","water resistent black wind breaker",0.1), + (118,"spare tire","24 inch spare tire",22.2); +update products set name = 'dailai' where id = 101; +delete from products where id = 102; + +alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1; +update products set add_column1 = 'swm1', add_column2 = 2; + +update products set name = 'dailai' where id = 110; +insert into products +values (119,"scooter","Small 2-wheel scooter",3.14,'xx',1), + (120,"car battery","12V car battery",8.1,'xx',2), + (121,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3), + (122,"hammer","12oz carpenter's hammer",0.75,'xx',4), + (123,"hammer","14oz carpenter's hammer",0.875,'xx',5), + (124,"hammer","16oz carpenter's hammer",1.0,'xx',6), + (125,"rocks","box of assorted rocks",5.3,'xx',7), + (126,"jacket","water resistent black wind breaker",0.1,'xx',8), + (127,"spare tire","24 inch spare tire",22.2,'xx',9); +delete from products where id = 118; + +alter table products ADD COLUMN add_column3 float not null default 1.1; +update products set add_column3 = 3.3; +alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(); +update products set add_column4 = current_timestamp(); + +delete from products where id = 113; +insert into products +values (128,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (129,"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (130,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (131,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (132,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (133,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (134,"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (135,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); +update products set name = 'dailai' where id = 135; + +alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id; +update products set add_column6 = 'swm6'; + +delete from products where id = 115; +insert into products +values (173,'tt',"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (174,'tt',"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (175,'tt',"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (176,'tt',"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (177,'tt',"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (178,'tt',"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (179,'tt',"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (180,'tt',"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (181,'tt',"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/change_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/change_columns.sql new file mode 100644 index 00000000000..a17f9a0a936 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/change_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products change add_column2 add_column int default 1 not null; +delete from products where id < 155; +insert into products +values (155,"scooter","Small 2-wheel scooter",3.14,1), + (156,"car battery","12V car battery",8.1,2), + (157,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (158,"hammer","12oz carpenter's hammer",0.75,4), + (159,"hammer","14oz carpenter's hammer",0.875,5), + (160,"hammer","16oz carpenter's hammer",1.0,6), + (161,"rocks","box of assorted rocks",5.3,7), + (162,"jacket","water resistent black wind breaker",0.1,8), + (163,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/drop_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/drop_columns.sql new file mode 100644 index 00000000000..5c3b7d1f549 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/drop_columns.sql @@ -0,0 +1,50 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products drop column add_column4,drop column add_column6; +insert into products +values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1), + (138,"car battery","12V car battery",8.1,'xx',2,1.2), + (139,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3), + (140,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4), + (141,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5), + (142,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6), + (143,"rocks","box of assorted rocks",5.3,'xx',7,1.7), + (144,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8), + (145,"spare tire","24 inch spare tire",22.2,'xx',9,1.9); +update products set name = 'dailai' where id in (140,141,142); +delete from products where id < 137; + + +alter table products drop column add_column1,drop column add_column3; +insert into products +values (146,"scooter","Small 2-wheel scooter",3.14,1), + (147,"car battery","12V car battery",8.1,2), + (148,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (149,"hammer","12oz carpenter's hammer",0.75,4), + (150,"hammer","14oz carpenter's hammer",0.875,5), + (151,"hammer","16oz carpenter's hammer",1.0,6), + (152,"rocks","box of assorted rocks",5.3,7), + (153,"jacket","water resistent black wind breaker",0.1,8), + (154,"spare tire","24 inch spare tire",22.2,9); +update products set name = 'dailai' where id > 143; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/modify_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/modify_columns.sql new file mode 100644 index 00000000000..ab64c47567b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/modify_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products modify name longtext null; +delete from products where id < 155; +insert into products +values (164,"scooter","Small 2-wheel scooter",3.14,1), + (165,"car battery","12V car battery",8.1,2), + (166,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (167,"hammer","12oz carpenter's hammer",0.75,4), + (168,"hammer","14oz carpenter's hammer",0.875,5), + (169,"hammer","16oz carpenter's hammer",1.0,6), + (170,"rocks","box of assorted rocks",5.3,7), + (171,"jacket","water resistent black wind breaker",0.1,8), + (172,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql new file mode 100644 index 00000000000..be2eaaeca9e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql @@ -0,0 +1,44 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +drop table if exists products; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (101,"scooter","Small 2-wheel scooter",3.14), + (102,"car battery","12V car battery",8.1), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (104,"hammer","12oz carpenter's hammer",0.75), + (105,"hammer","14oz carpenter's hammer",0.875), + (106,"hammer","16oz carpenter's hammer",1.0), + (107,"rocks","box of assorted rocks",5.3), + (108,"jacket","water resistent black wind breaker",0.1), + (109,"spare tire","24 inch spare tire",22.2); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/server-gtids/my.cnf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/server-gtids/my.cnf new file mode 100644 index 00000000000..a390897885d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/server-gtids/my.cnf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/setup.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/setup.sql new file mode 100644 index 00000000000..495bda2c7a1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/docker/setup.sql @@ -0,0 +1,29 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'mysqluser' - all privileges +-- 2) 'st_user_source' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw'; +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, DROP, LOCK TABLES ON *.* TO 'st_user_source'@'%'; +-- ---------------------------------------------------------------------------------------------------------------- diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf new file mode 100644 index 00000000000..ba3c03db1e1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + StarRocks { + # docker allin1 environment can use port 8080 8040 instead of port FE 8030 + nodeUrls = ["starrocks_cdc_e2e:8040"] + username = "root" + password = "" + database = "shop" + table = "${table_name}" + base-url = "jdbc:mysql://starrocks_cdc_e2e:9030/shop" + max_retries = 3 + enable_upsert_delete = true + schema_save_mode="RECREATE_SCHEMA" + data_save_mode="DROP_DATA" + save_mode_create_template = """ + CREATE TABLE IF NOT EXISTS shop.`${table_name}` ( + ${rowtype_primary_key}, + ${rowtype_fields} + ) ENGINE=OLAP + PRIMARY KEY (${rowtype_primary_key}) + DISTRIBUTED BY HASH (${rowtype_primary_key}) + PROPERTIES ( + "replication_num" = "1", + "in_memory" = "false", + "enable_persistent_index" = "true", + "replicated_storage" = "true", + "compression" = "LZ4" + ) + """ + } +}