Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] StarRocks-sink support schema evolution #8082

Open
wants to merge 28 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d272789
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode conflict…
jw-itq Aug 19, 2024
4ebc66b
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6810d80
[Bug] [sink elasticsearch] the savemode of sink-es conficts with es a…
jw-itq Aug 19, 2024
ac50c3e
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6e8ccc9
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
3936038
[Doc] Add IGNORE savemode type into docment #7443
jw-itq Aug 22, 2024
3bc24c2
Merge branch 'apache:dev' into dev
jw-itq Aug 22, 2024
c26e89d
Merge branch 'apache:dev' into dev
jw-itq Aug 26, 2024
b5f5162
Merge branch 'apache:dev' into dev
jw-itq Aug 28, 2024
0347da2
Merge branch 'apache:dev' into dev
jw-itq Sep 17, 2024
4bce27f
Merge branch 'apache:dev' into dev
jw-itq Nov 11, 2024
d4993f3
Merge branch 'apache:dev' into dev
jw-itq Nov 12, 2024
5d61e76
Merge branch 'apache:dev' into dev
jw-itq Nov 19, 2024
2723cb4
support schema evolution
jw-itq Nov 19, 2024
0b33432
change package name
jw-itq Nov 19, 2024
b00c5d0
fix
jw-itq Nov 19, 2024
5665556
fix
jw-itq Nov 19, 2024
12c8e68
fix
jw-itq Nov 19, 2024
abd304e
fix
jw-itq Nov 19, 2024
a947891
fix e2e test
jw-itq Nov 19, 2024
e17e1aa
fix e2e test
jw-itq Nov 19, 2024
3a05bf6
fix e2e test
jw-itq Nov 20, 2024
bb429c4
log level
jw-itq Nov 20, 2024
8f8e143
change log level
jw-itq Nov 20, 2024
0693041
fix fe port
jw-itq Nov 20, 2024
c586910
fix test case
jw-itq Nov 21, 2024
84b59ba
fix Implements SchemaEvolution API
jw-itq Nov 24, 2024
573ec9c
move SupportSchemaEvolutionSink to StarRocksSink
jw-itq Nov 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
Expand All @@ -42,10 +43,10 @@ public class StarRocksSinkManager {
private int batchRowCount = 0;
private long batchBytesSize = 0;

public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames) {
public StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema) {
this.sinkConfig = sinkConfig;
this.batchList = new ArrayList<>();
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames);
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, tableSchema);
}

private void tryInit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP;

import org.apache.commons.codec.binary.Base64;

Expand Down Expand Up @@ -56,11 +59,11 @@ public class StarRocksStreamLoadVisitor {
private static final String RESULT_LABEL_ABORTED = "ABORTED";
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";

private List<String> fieldNames;
private final TableSchema tableSchema;

public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) {
public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, TableSchema tableSchema) {
this.sinkConfig = sinkConfig;
this.fieldNames = fieldNames;
this.tableSchema = tableSchema;
this.httpHelper = new HttpHelper(sinkConfig);
}

Expand Down Expand Up @@ -260,16 +263,19 @@ private String getBasicAuthHeader(String username, String password) {

private Map<String, String> getStreamLoadHttpHeader(String label) {
Map<String, String> headerMap = new HashMap<>();
if (null != fieldNames
&& !fieldNames.isEmpty()
List<Column> columns = tableSchema.getColumns();
List<String> fieldNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
if (sinkConfig.isEnableUpsertDelete()) {
fieldNames.add(StarRocksSinkOP.COLUMN_KEY);
}
if (!fieldNames.isEmpty()
&& SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
headerMap.put(
"columns",
String.join(
",",
fieldNames.stream()
.map(f -> String.format("`%s`", f))
.collect(Collectors.toList())));
fieldNames.stream()
.map(f -> String.format("`%s`", f))
.collect(Collectors.joining(",")));
}
if (null != sinkConfig.getStreamLoadProps()) {
for (Map.Entry<String, Object> entry : sinkConfig.getStreamLoadProps().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class StarRocksType {
public static final String SR_NULL = "NULL";
public static final String SR_BOOLEAN = "BOOLEAN";
public static final String SR_TINYINT = "TINYINT";
public static final String SR_SMALLINT = "SMALLINT";
public static final String SR_INT = "INT";
public static final String SR_BIGINT = "BIGINT";
public static final String SR_LARGEINT = "LARGEINT";
public static final String SR_FLOAT = "FLOAT";
public static final String SR_DOUBLE = "DOUBLE";
public static final String SR_DECIMAL = "DECIMAL";
public static final String SR_DECIMALV3 = "DECIMALV3";
public static final String SR_DATE = "DATE";
public static final String SR_DATETIME = "DATETIME";
public static final String SR_CHAR = "CHAR";
public static final String SR_VARCHAR = "VARCHAR";
public static final String SR_STRING = "STRING";

public static final String SR_BOOLEAN_ARRAY = "ARRAY<boolean>";
public static final String SR_TINYINT_ARRAY = "ARRAY<tinyint>";
public static final String SR_SMALLINT_ARRAY = "ARRAY<smallint>";
public static final String SR_INT_ARRAY = "ARRAY<int(11)>";
public static final String SR_BIGINT_ARRAY = "ARRAY<bigint>";
public static final String SR_FLOAT_ARRAY = "ARRAY<float>";
public static final String SR_DOUBLE_ARRAY = "ARRAY<double>";
public static final String SR_DECIMALV3_ARRAY = "ARRAY<DECIMALV3>";
public static final String SR_DECIMALV3_ARRAY_COLUMN_TYPE_TMP = "ARRAY<DECIMALV3(%s, %s)>";
public static final String SR_DATEV2_ARRAY = "ARRAY<DATEV2>";
public static final String SR_DATETIMEV2_ARRAY = "ARRAY<DATETIMEV2>";
public static final String SR_STRING_ARRAY = "ARRAY<STRING>";

// Because can not get the column length from array, So the following types of arrays cannot be
// generated properly.
public static final String SR_LARGEINT_ARRAY = "ARRAY<largeint>";
public static final String SR_CHAR_ARRAY = "ARRAY<CHAR>";
public static final String SR_CHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY<CHAR(%s)>";
public static final String SR_VARCHAR_ARRAY = "ARRAY<VARCHAR>";
public static final String SR_VARCHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY<VARCHAR(%s)>";

public static final String SR_JSON = "JSON";
public static final String SR_JSONB = "JSONB";

public static final String SR_ARRAY = "ARRAY";

public static final String SR_ARRAY_BOOLEAN_INTER = "tinyint(1)";
public static final String SR_ARRAY_TINYINT_INTER = "tinyint(4)";
public static final String SR_ARRAY_SMALLINT_INTER = "smallint(6)";
public static final String SR_ARRAY_INT_INTER = "int(11)";
public static final String SR_ARRAY_BIGINT_INTER = "bigint(20)";
public static final String SR_ARRAY_DECIMAL_PRE = "DECIMAL";
public static final String SR_ARRAY_DATE_INTER = "date";
public static final String SR_ARRAY_DATEV2_INTER = "DATEV2";
public static final String SR_ARRAY_DATETIME_INTER = "DATETIME";
public static final String SR_ARRAY_DATETIMEV2_INTER = "DATETIMEV2";

public static final String SR_MAP = "MAP";
public static final String SR_MAP_COLUMN_TYPE = "MAP<%s, %s>";

public static final String SR_BOOLEAN_INDENTFIER = "TINYINT(1)";

private String type;
}
Loading
Loading