diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md
index 15d92f8c5fc..ae67ceb232d 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -59,8 +59,69 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| primary_key | String | No | - | Mark the primary key column from clickhouse table, and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table. |
| support_upsert | Boolean | No | false | Support upsert row by query primary key. |
| allow_experimental_lightweight_delete | Boolean | No | false | Allow experimental lightweight delete based on `*MergeTree` table engine. |
+| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode. Please refer to the `schema_save_mode` section below. |
+| data_save_mode | Enum | no | APPEND_DATA | Data save mode. Please refer to the `data_save_mode` section below. |
+| save_mode_create_template | string | no | see below | See below. |
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details. |
+### schema_save_mode[Enum]
+
+Before starting the synchronization task, choose different processing options for the existing table schema.
+Option descriptions:
+`RECREATE_SCHEMA`: Create the table if it does not exist; drop and recreate the table when saving.
+`CREATE_SCHEMA_WHEN_NOT_EXIST`: Create the table if it does not exist; skip if the table already exists.
+`ERROR_WHEN_SCHEMA_NOT_EXIST`: Throw an error if the table does not exist.
+`IGNORE`: Ignore the processing of the table.
+
+### data_save_mode[Enum]
+
+Before starting the synchronization task, choose different processing options for the existing data on the target side.
+Option descriptions:
+`DROP_DATA`: Retain the database schema but delete the data.
+`APPEND_DATA`: Retain the database schema and the data.
+`CUSTOM_PROCESSING`: Custom user-defined processing.
+`ERROR_WHEN_DATA_EXISTS`: Throw an error if data exists.
+
+### save_mode_create_template
+
+Automatically create Doris tables using templates.
+The table creation statements will be generated based on the upstream data types and schema. The default template can be modified as needed.
+
+Default template:
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+ ${rowtype_primary_key},
+ ${rowtype_fields}
+) ENGINE = MergeTree()
+ORDER BY (${rowtype_primary_key})
+PRIMARY KEY (${rowtype_primary_key})
+SETTINGS
+ index_granularity = 8192;
+```
+
+If custom fields are added to the template, for example, adding an `id` field:
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+ id,
+ ${rowtype_fields}
+) ENGINE = MergeTree()
+ ORDER BY (${rowtype_primary_key})
+ PRIMARY KEY (${rowtype_primary_key})
+ SETTINGS
+ index_granularity = 8192;
+```
+
+The connector will automatically retrieve the corresponding types from the upstream source and fill in the template, removing the `id` field from the `rowtype_fields`. This method can be used to modify custom field types and attributes.
+
+The following placeholders can be used:
+
+- `database`: Retrieves the database from the upstream schema.
+- `table_name`: Retrieves the table name from the upstream schema.
+- `rowtype_fields`: Retrieves all fields from the upstream schema and automatically maps them to Doris field descriptions.
+- `rowtype_primary_key`: Retrieves the primary key from the upstream schema (this may be a list).
+- `rowtype_unique_key`: Retrieves the unique key from the upstream schema (this may be a list).
+
## How to Create a Clickhouse Data Synchronization Jobs
The following example demonstrates how to create a data synchronization job that writes randomly generated data to a Clickhouse database:
diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md
index 938471c862a..527a94fc94a 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -203,7 +203,7 @@ source {
sink {
Assert {
- source_table_name = hive_source
+ plugin_input = hive_source
rules {
row_rules = [
{
diff --git a/docs/zh/connector-v2/sink/Clickhouse.md b/docs/zh/connector-v2/sink/Clickhouse.md
index 61a359f5c0b..41e9cefc60d 100644
--- a/docs/zh/connector-v2/sink/Clickhouse.md
+++ b/docs/zh/connector-v2/sink/Clickhouse.md
@@ -42,7 +42,7 @@
| ARRAY | Array |
| MAP | Map |
-## 输出选项
+## Sink 选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---------------------------------------|---------|------|-------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -58,8 +58,71 @@
| primary_key | String | No | - | 标记`clickhouse`表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到`clickhouse`表. |
| support_upsert | Boolean | No | false | 支持按查询主键更新插入行. |
| allow_experimental_lightweight_delete | Boolean | No | false | 允许基于`MergeTree`表引擎实验性轻量级删除. |
+| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode` |
+| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的`data_save_mode`。 |
+| save_mode_create_template | string | no | see below | 见下文。 |
| common-options | | No | - | Sink插件查用参数,详见[Sink常用选项](../sink-common-options.md). |
+### schema_save_mode[Enum]
+
+在开启同步任务之前,针对现有的表结构选择不同的处理方案。
+选项介绍:
+`RECREATE_SCHEMA` :表不存在时创建,表保存时删除并重建。
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。
+`IGNORE` :忽略对表的处理。
+
+### data_save_mode[Enum]
+
+在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
+选项介绍:
+`DROP_DATA`: 保留数据库结构并删除数据。
+`APPEND_DATA`:保留数据库结构,保留数据。
+`CUSTOM_PROCESSING`:用户自定义处理。
+`ERROR_WHEN_DATA_EXISTS`:有数据时报错。
+
+### save_mode_create_template
+
+使用模板自动创建Doris表,
+会根据上游数据类型和schema类型创建相应的建表语句,
+默认模板可以根据情况进行修改。
+
+默认模板:
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+ ${rowtype_primary_key},
+ ${rowtype_fields}
+) ENGINE = MergeTree()
+ORDER BY (${rowtype_primary_key})
+PRIMARY KEY (${rowtype_primary_key})
+SETTINGS
+ index_granularity = 8192;
+```
+
+如果模板中填写了自定义字段,例如添加 id 字段
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+ id,
+ ${rowtype_fields}
+) ENGINE = MergeTree()
+ ORDER BY (${rowtype_primary_key})
+ PRIMARY KEY (${rowtype_primary_key})
+ SETTINGS
+ index_granularity = 8192;
+```
+
+连接器会自动从上游获取对应类型完成填充,
+并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
+
+可以使用以下占位符:
+
+- database:用于获取上游schema中的数据库。
+- table_name:用于获取上游schema中的表名。
+- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
+- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
+- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
+
## 如何创建一个clickhouse 同步任务
以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。
diff --git a/docs/zh/connector-v2/source/Hive.md b/docs/zh/connector-v2/source/Hive.md
index f1bec9fd956..094d701b455 100644
--- a/docs/zh/connector-v2/source/Hive.md
+++ b/docs/zh/connector-v2/source/Hive.md
@@ -187,7 +187,7 @@ source {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
- result_table_name = hive_source
+ plugin_output = hive_source
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
kerberos_keytab_path = "/tmp/hive.keytab"
@@ -197,7 +197,7 @@ source {
sink {
Assert {
- source_table_name = hive_source
+ plugin_input = hive_source
rules {
row_rules = [
{
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index 002b60c49bd..7c550b3cc3d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -30,7 +30,10 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
- SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
+ SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"),
+ LIST_DATABASES_FAILED("API-12", "List databases failed"),
+ LIST_TABLES_FAILED("API-13", "List tables failed"),
+ GET_PRIMARY_KEY_FAILED("API-14", "Get primary key failed");
private final String code;
private final String description;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/pom.xml b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
index 22d2565a63a..93ffad1d60a 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/pom.xml
+++ b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
@@ -68,6 +68,11 @@
${project.version}
optional
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java
new file mode 100644
index 00000000000..4c7bba896ef
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.clickhouse.client.ClickHouseColumn;
+import com.clickhouse.client.ClickHouseNode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class ClickhouseCatalog implements Catalog {
+
+ protected String defaultDatabase = "information_schema";
+ private ReadonlyConfig readonlyConfig;
+ private ClickhouseProxy proxy;
+ private final String template;
+
+ private String catalogName;
+
+ public ClickhouseCatalog(ReadonlyConfig readonlyConfig, String catalogName) {
+ this.readonlyConfig = readonlyConfig;
+ this.catalogName = catalogName;
+ this.template = readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE);
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return proxy.listDatabases();
+ }
+
+ @Override
+ public List listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName, databaseName);
+ }
+
+ return proxy.listTable(databaseName);
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ List clickHouseColumns =
+ proxy.getClickHouseColumns(tablePath.getFullNameWithQuoted());
+
+ try {
+ Optional primaryKey =
+ proxy.getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
+
+ TableSchema.Builder builder = TableSchema.builder();
+ primaryKey.ifPresent(builder::primaryKey);
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ clickHouseColumns.iterator(),
+ column ->
+ PhysicalColumn.of(
+ column.getColumnName(),
+ TypeConvertUtil.convert(column),
+ (long) column.getEstimatedLength(),
+ column.getScale(),
+ column.isNullable(),
+ null,
+ null));
+
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+ return CatalogTable.of(
+ tableIdentifier,
+ builder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "");
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ log.debug("Create table :{}.{}", tablePath.getDatabaseName(), tablePath.getTableName());
+ proxy.createTable(
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ template,
+ table.getTableSchema());
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ proxy.dropTable(tablePath, ignoreIfNotExists);
+ }
+
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ if (tableExists(tablePath)) {
+ proxy.truncateTable(tablePath, ignoreIfNotExists);
+ }
+ } catch (Exception e) {
+ throw new CatalogException("Truncate table failed", e);
+ }
+ }
+
+ @Override
+ public void executeSql(TablePath tablePath, String sql) {
+ try {
+ proxy.executeSql(sql);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed EXECUTE SQL in catalog %s", sql), e);
+ }
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ try {
+ return proxy.isExistsData(tablePath.getFullName());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ proxy.createDatabase(tablePath.getDatabaseName(), ignoreIfExists);
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ proxy.dropDatabase(tablePath.getDatabaseName(), ignoreIfNotExists);
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private Map buildConnectorOptions(TablePath tablePath) {
+ Map options = new HashMap<>(8);
+ options.put("connector", "clickhouse");
+ options.put("host", readonlyConfig.get(HOST));
+ options.put("database", tablePath.getDatabaseName());
+ return options;
+ }
+
+ @Override
+ public String getDefaultDatabase() {
+ return defaultDatabase;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ List nodes = ClickhouseUtil.createNodes(readonlyConfig);
+ Properties clickhouseProperties = new Properties();
+ readonlyConfig
+ .get(CLICKHOUSE_CONFIG)
+ .forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value)));
+
+ clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+ clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+ proxy = new ClickhouseProxy(nodes.get(0));
+ }
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ checkArgument(StringUtils.isNotBlank(databaseName));
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ return proxy.tableExists(tablePath.getDatabaseName(), tablePath.getTableName());
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ Preconditions.checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
+ return new SQLPreviewResult(
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ template,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ catalogTable.get().getTableSchema(),
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new SQLPreviewResult(
+ ClickhouseCatalogUtil.INSTANCE.getDropTableSql(tablePath, true));
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new SQLPreviewResult(
+ ClickhouseCatalogUtil.INSTANCE.getTruncateTableSql(tablePath));
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new SQLPreviewResult(
+ ClickhouseCatalogUtil.INSTANCE.getCreateDatabaseSql(
+ tablePath.getDatabaseName(), true));
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new SQLPreviewResult(
+ ClickhouseCatalogUtil.INSTANCE.getDropDatabaseSql(
+ tablePath.getDatabaseName(), true));
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type: " + actionType);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java
new file mode 100644
index 00000000000..12e7c8490bd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "clickhouse";
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new ClickhouseCatalog(options, catalogName);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ClickhouseConfig.HOST)
+ .required(ClickhouseConfig.DATABASE)
+ .required(ClickhouseConfig.USERNAME)
+ .required(ClickhouseConfig.PASSWORD)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java
new file mode 100644
index 00000000000..6259d2f9075
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeConverter;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class ClickhouseTypeConverter
+ implements BasicTypeConverter> {
+ public static final ClickhouseTypeConverter INSTANCE = new ClickhouseTypeConverter();
+ public static final Integer MAX_DATETIME_SCALE = 9;
+ public static final String IDENTIFIER = "Clickhouse";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Column convert(BasicTypeDefine typeDefine) {
+ throw new UnsupportedOperationException("Unsupported operation");
+ }
+
+ @Override
+ public BasicTypeDefine reconvert(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder builder =
+ BasicTypeDefine.builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
+
+ switch (column.getDataType().getSqlType()) {
+ case BOOLEAN:
+ builder.columnType(ClickhouseType.BOOLEAN);
+ builder.dataType(ClickhouseType.BOOLEAN);
+ break;
+ case TINYINT:
+ builder.columnType(ClickhouseType.TINYINT);
+ builder.dataType(ClickhouseType.TINYINT);
+ break;
+ case SMALLINT:
+ builder.columnType(ClickhouseType.SMALLINT);
+ builder.dataType(ClickhouseType.SMALLINT);
+ break;
+ case INT:
+ builder.columnType(ClickhouseType.INT);
+ builder.dataType(ClickhouseType.INT);
+ break;
+ case BIGINT:
+ builder.columnType(ClickhouseType.BIGINT);
+ builder.dataType(ClickhouseType.BIGINT);
+ break;
+ case FLOAT:
+ builder.columnType(ClickhouseType.FLOAT);
+ builder.dataType(ClickhouseType.FLOAT);
+ break;
+ case DOUBLE:
+ builder.columnType(ClickhouseType.DOUBLE);
+ builder.dataType(ClickhouseType.DOUBLE);
+ break;
+ case DATE:
+ builder.columnType(ClickhouseType.DATE);
+ builder.dataType(ClickhouseType.DATE);
+ break;
+ case TIME:
+ case STRING:
+ builder.columnType(ClickhouseType.STRING);
+ builder.dataType(ClickhouseType.STRING);
+ break;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) column.getDataType();
+ builder.columnType(
+ String.format(
+ "%s(%s, %s)",
+ ClickhouseType.DECIMAL,
+ decimalType.getPrecision(),
+ decimalType.getScale()));
+ builder.dataType(ClickhouseType.DECIMAL);
+ break;
+ case TIMESTAMP:
+ if (column.getScale() != null
+ && column.getScale() > 0
+ && column.getScale() <= MAX_DATETIME_SCALE) {
+ builder.columnType(
+ String.format("%s(%s)", ClickhouseType.DateTime64, column.getScale()));
+ builder.scale(column.getScale());
+ } else {
+ builder.columnType(String.format("%s(%s)", ClickhouseType.DateTime64, 0));
+ builder.scale(0);
+ }
+ builder.dataType(ClickhouseType.DateTime64);
+ break;
+ case MAP:
+ MapType dataType = (MapType) column.getDataType();
+ SeaTunnelDataType keyType = dataType.getKeyType();
+ SeaTunnelDataType valueType = dataType.getValueType();
+ Column keyColumn =
+ PhysicalColumn.of(
+ column.getName() + ".key",
+ (SeaTunnelDataType>) keyType,
+ (Long) null,
+ true,
+ null,
+ null);
+ String keyColumnType = reconvert(keyColumn).getColumnType();
+ Column valueColumn =
+ PhysicalColumn.of(
+ column.getName() + ".value",
+ (SeaTunnelDataType>) valueType,
+ (Long) null,
+ true,
+ null,
+ null);
+ String valueColumnType = reconvert(valueColumn).getColumnType();
+
+ builder.dataType(ClickhouseType.MAP);
+ builder.columnType(
+ String.format(
+ "%s(%s, %s)", ClickhouseType.MAP, keyColumnType, valueColumnType));
+ break;
+ case ARRAY:
+ SeaTunnelDataType> arrayDataType = column.getDataType();
+ SeaTunnelDataType elementType = null;
+ if (arrayDataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) arrayDataType;
+ elementType = arrayType.getElementType();
+ }
+
+ Column arrayKeyColumn =
+ PhysicalColumn.of(
+ column.getName() + ".key",
+ (SeaTunnelDataType>) elementType,
+ (Long) null,
+ true,
+ null,
+ null);
+ String arrayKeyColumnType = reconvert(arrayKeyColumn).getColumnType();
+ builder.dataType(ClickhouseType.ARRAY);
+ builder.columnType(
+ String.format("%s(%s)", ClickhouseType.ARRAY, arrayKeyColumnType));
+ break;
+ default:
+ throw CommonError.convertToConnectorTypeError(
+ IDENTIFIER, column.getDataType().getSqlType().name(), column.getName());
+ }
+ return builder.build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index bb0417b1712..1408430149c 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -19,6 +19,9 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.time.ZoneId;
import java.util.Collections;
@@ -178,4 +181,48 @@ public class ClickhouseConfig {
.defaultValue("/tmp/seatunnel/clickhouse-local/file")
.withDescription(
"The directory where ClickhouseFile stores temporary files locally.");
+ public static final Option SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription(
+ "different treatment schemes are selected for the existing surface structure of the target side");
+
+ public static final Option DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .enumType(DataSaveMode.class)
+ .defaultValue(DataSaveMode.APPEND_DATA)
+ .withDescription(
+ "different processing schemes are selected for data existing data on the target side");
+
+ public static final Option CUSTOM_SQL =
+ Options.key("custom_sql")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("when schema_save_mode selects CUSTOM_PROCESSING custom SQL");
+
+ public static final Option SAVE_MODE_CREATE_TEMPLATE =
+ Options.key("save_mode_create_template")
+ .stringType()
+ .defaultValue(
+ "CREATE TABLE IF NOT EXISTS `"
+ + SaveModePlaceHolder.DATABASE.getPlaceHolder()
+ + "`.`"
+ + SaveModePlaceHolder.TABLE.getPlaceHolder()
+ + "` (\n"
+ + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ",\n"
+ + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ + "\n"
+ + ") ENGINE = MergeTree()\n"
+ + "ORDER BY ("
+ + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")\n"
+ + "PRIMARY KEY ("
+ + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")\n"
+ + "SETTINGS\n"
+ + " index_granularity = 8192;")
+ .withDescription(
+ "Create table statement template, used to create Clickhouse table");
}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java
new file mode 100644
index 00000000000..01fb0c57c01
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Map;
+
+@Getter
+@AllArgsConstructor
+public class ClickhouseType {
+
+ public static final String STRING = "String";
+ public static final String TINYINT = "Int8";
+ public static final String SMALLINT = "Int16";
+ public static final String INT = "Int32";
+ public static final String BIGINT = "Int64";
+ public static final String FLOAT = "Float32";
+ public static final String BOOLEAN = "Bool";
+ public static final String DOUBLE = "Float64";
+ public static final String DATE = "Date";
+ public static final String DateTime64 = "DateTime64";
+ public static final String MAP = "Map";
+ public static final String ARRAY = "Array";
+ public static final String DECIMAL = "Decimal";
+ private String type;
+ private Map options;
+}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 22f18694e23..644b078b5bf 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,30 +17,68 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseCatalog;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseCatalogFactory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import com.clickhouse.client.ClickHouseNode;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CUSTOM_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
public class ClickhouseSink
- implements SeaTunnelSink {
+ implements SeaTunnelSink,
+ SupportSaveMode {
private ReaderOption option;
private CatalogTable catalogTable;
- public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) {
- this.option = option;
+ private ReadonlyConfig readonlyConfig;
+
+ public ClickhouseSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
this.catalogTable = catalogTable;
+ this.readonlyConfig = readonlyConfig;
}
@Override
@@ -51,6 +89,76 @@ public String getPluginName() {
@Override
public SinkWriter createWriter(
SinkWriter.Context context) throws IOException {
+ List nodes = ClickhouseUtil.createNodes(readonlyConfig);
+ Properties clickhouseProperties = new Properties();
+ readonlyConfig
+ .get(CLICKHOUSE_CONFIG)
+ .forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value)));
+
+ clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+ clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+ ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+
+ Map tableSchema = proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
+ String shardKey = null;
+ String shardKeyType = null;
+ ClickhouseTable table =
+ proxy.getClickhouseTable(readonlyConfig.get(DATABASE), readonlyConfig.get(TABLE));
+ if (readonlyConfig.get(SPLIT_MODE)) {
+ if (!"Distributed".equals(table.getEngine())) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "split mode only support table which engine is "
+ + "'Distributed' engine at now");
+ }
+ if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
+ shardKey = readonlyConfig.get(SHARDING_KEY);
+ shardKeyType = tableSchema.get(shardKey);
+ }
+ }
+ ShardMetadata metadata =
+ new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ table.getSortingKey(),
+ readonlyConfig.get(DATABASE),
+ readonlyConfig.get(TABLE),
+ table.getEngine(),
+ readonlyConfig.get(SPLIT_MODE),
+ new Shard(1, 1, nodes.get(0)),
+ readonlyConfig.get(USERNAME),
+ readonlyConfig.get(PASSWORD));
+ proxy.close();
+ String[] primaryKeys = null;
+ if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
+ String primaryKey = readonlyConfig.get(PRIMARY_KEY);
+ if (primaryKey == null || primaryKey.trim().isEmpty()) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "primary_key can not be empty");
+ }
+ if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
+ }
+ primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
+ }
+ boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
+ boolean allowExperimentalLightweightDelete =
+ readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
+
+ ReaderOption option =
+ ReaderOption.builder()
+ .shardMetadata(metadata)
+ .properties(clickhouseProperties)
+ .seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+ .tableEngine(table.getEngine())
+ .tableSchema(tableSchema)
+ .bulkSize(readonlyConfig.get(BULK_SIZE))
+ .primaryKeys(primaryKeys)
+ .supportUpsert(supportUpsert)
+ .allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+ .build();
return new ClickhouseSinkWriter(option, context);
}
@@ -69,4 +177,21 @@ public Optional> getWriterStateSerializer() {
public Optional getWriteCatalogTable() {
return Optional.of(catalogTable);
}
+
+ @Override
+ public Optional getSaveModeHandler() {
+ TablePath tablePath = TablePath.of(readonlyConfig.get(DATABASE), readonlyConfig.get(TABLE));
+ ClickhouseCatalog clickhouseCatalog =
+ new ClickhouseCatalog(readonlyConfig, ClickhouseCatalogFactory.IDENTIFIER);
+ SchemaSaveMode schemaSaveMode = readonlyConfig.get(ClickhouseConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode = readonlyConfig.get(ClickhouseConfig.DATA_SAVE_MODE);
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode,
+ dataSaveMode,
+ clickhouseCatalog,
+ tablePath,
+ catalogTable,
+ readonlyConfig.get(CUSTOM_SQL)));
+ }
}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
index edc36eabbaf..0640ba59fac 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -24,27 +24,9 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
-import com.clickhouse.client.ClickHouseNode;
import com.google.auto.service.AutoService;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
@@ -66,87 +48,10 @@ public String factoryIdentifier() {
}
@Override
- public TableSink createSink(
- TableSinkFactoryContext context) {
+ public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- List nodes = ClickhouseUtil.createNodes(readonlyConfig);
- Properties clickhouseProperties = new Properties();
- readonlyConfig
- .get(CLICKHOUSE_CONFIG)
- .forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value)));
-
- clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
- clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
- ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
- try {
- Map tableSchema =
- proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
- String shardKey = null;
- String shardKeyType = null;
- ClickhouseTable table =
- proxy.getClickhouseTable(
- readonlyConfig.get(DATABASE), readonlyConfig.get(TABLE));
- if (readonlyConfig.get(SPLIT_MODE)) {
- if (!"Distributed".equals(table.getEngine())) {
- throw new ClickhouseConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "split mode only support table which engine is "
- + "'Distributed' engine at now");
- }
- if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
- shardKey = readonlyConfig.get(SHARDING_KEY);
- shardKeyType = tableSchema.get(shardKey);
- }
- }
- ShardMetadata metadata =
- new ShardMetadata(
- shardKey,
- shardKeyType,
- table.getSortingKey(),
- readonlyConfig.get(DATABASE),
- readonlyConfig.get(TABLE),
- table.getEngine(),
- readonlyConfig.get(SPLIT_MODE),
- new Shard(1, 1, nodes.get(0)),
- readonlyConfig.get(USERNAME),
- readonlyConfig.get(PASSWORD));
- proxy.close();
- String[] primaryKeys = null;
- if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
- String primaryKey = readonlyConfig.get(PRIMARY_KEY);
- if (primaryKey == null || primaryKey.trim().isEmpty()) {
- throw new ClickhouseConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "primary_key can not be empty");
- }
- if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
- throw new ClickhouseConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
- }
- primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
- }
- boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
- boolean allowExperimentalLightweightDelete =
- readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
-
- ReaderOption option =
- ReaderOption.builder()
- .shardMetadata(metadata)
- .properties(clickhouseProperties)
- .seaTunnelRowType(catalogTable.getSeaTunnelRowType())
- .tableEngine(table.getEngine())
- .tableSchema(tableSchema)
- .bulkSize(readonlyConfig.get(BULK_SIZE))
- .primaryKeys(primaryKeys)
- .supportUpsert(supportUpsert)
- .allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
- .build();
- return () -> new ClickhouseSink(option, catalogTable);
- } finally {
- proxy.close();
- }
+ return () -> new ClickhouseSink(catalogTable, readonlyConfig);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
new file mode 100644
index 00000000000..bf4e02c3fb1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ClickhouseCatalogUtil extends CatalogUtil {
+
+ public static final ClickhouseCatalogUtil INSTANCE = new ClickhouseCatalogUtil();
+
+ public String columnToConnectorType(Column column) {
+ checkNotNull(column, "The column is required.");
+ return String.format(
+ "`%s` %s %s",
+ column.getName(),
+ ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(),
+ StringUtils.isEmpty(column.getComment())
+ ? ""
+ : "COMMENT '" + column.getComment() + "'");
+ }
+
+ public String getDropTableSql(TablePath tablePath, boolean ignoreIfNotExists) {
+ if (ignoreIfNotExists) {
+ return "DROP TABLE IF EXISTS "
+ + tablePath.getDatabaseName()
+ + "."
+ + tablePath.getTableName();
+ } else {
+ return "DROP TABLE " + tablePath.getDatabaseName() + "." + tablePath.getTableName();
+ }
+ }
+
+ public String getTruncateTableSql(TablePath tablePath) {
+ return "TRUNCATE TABLE " + tablePath.getDatabaseName() + "." + tablePath.getTableName();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index c4178182578..be48f728b16 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -18,28 +18,39 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRecord;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+@Slf4j
@SuppressWarnings("magicnumber")
public class ClickhouseProxy {
@@ -153,6 +164,19 @@ public Map getClickhouseTableSchema(
return schema;
}
+ public List getClickHouseColumns(String table) {
+ String sql = "desc " + table;
+ try (ClickHouseResponse response = this.clickhouseRequest.query(sql).executeAndWait()) {
+ return response.getColumns();
+
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
+ "Cannot get table schema from clickhouse",
+ e);
+ }
+ }
+
/**
* Get the shard of the given cluster.
*
@@ -267,6 +291,137 @@ public String localizationEngine(String engine, String ddl) {
}
}
+ public boolean tableExists(String database, String table) {
+ String sql =
+ String.format(
+ "select count(1) from system.tables where database = '%s' and name = '%s'",
+ database, table);
+ try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+ return response.firstRecord().getValue(0).asInteger() > 0;
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse", e);
+ }
+ }
+
+ public List listDatabases() {
+ String sql = "select distinct database from system.tables";
+ try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+ Iterable records = response.records();
+ return StreamSupport.stream(records.spliterator(), false)
+ .map(r -> r.getValue(0).asString())
+ .collect(Collectors.toList());
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.LIST_DATABASES_FAILED,
+ "Cannot list databases from clickhouse",
+ e);
+ }
+ }
+
+ public List listTable(String database) {
+ String sql = "SELECT name FROM system.tables WHERE database = '" + database + "'";
+ try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+ Iterable records = response.records();
+ return StreamSupport.stream(records.spliterator(), false)
+ .map(r -> r.getValue(0).asString())
+ .collect(Collectors.toList());
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.LIST_TABLES_FAILED,
+ "Cannot list tables from clickhouse",
+ e);
+ }
+ }
+
+ public void executeSql(String sql) {
+ try {
+ clickhouseRequest
+ .write()
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ .query(sql)
+ .execute()
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void createTable(
+ String database, String table, String template, TableSchema tableSchema) {
+ String createTableSql =
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ template,
+ database,
+ table,
+ tableSchema,
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+ log.debug("Create Clickhouse table sql: {}", createTableSql);
+ executeSql(createTableSql);
+ }
+
+ public Optional getPrimaryKey(String schema, String table) throws SQLException {
+
+ List pkFields;
+ String sql =
+ "SELECT\n"
+ + " name as column_name\n"
+ + "FROM system.columns\n"
+ + "WHERE table = '"
+ + table
+ + "'\n"
+ + " AND database = '"
+ + schema
+ + "'\n"
+ + " AND is_in_primary_key = 1\n"
+ + "ORDER BY position;";
+ try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+ Iterable records = response.records();
+ pkFields =
+ StreamSupport.stream(records.spliterator(), false)
+ .map(r -> r.getValue(0).asString())
+ .collect(Collectors.toList());
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.GET_PRIMARY_KEY_FAILED,
+ "Cannot get primary key from clickhouse",
+ e);
+ }
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate a unique name in that case
+ String pkName = "pk_" + String.join("_", pkFields);
+ return Optional.of(PrimaryKey.of(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ public boolean isExistsData(String tableName) throws ExecutionException, InterruptedException {
+ // 定义查询数据的SQL语句
+ String queryDataSql = "SELECT count(*) FROM " + tableName;
+ try (ClickHouseResponse response = clickhouseRequest.query(queryDataSql).executeAndWait()) {
+ return response.firstRecord().getValue(0).asInteger() > 0;
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse", e);
+ }
+ }
+
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ executeSql(ClickhouseCatalogUtil.INSTANCE.getDropTableSql(tablePath, ignoreIfNotExists));
+ }
+
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ executeSql(ClickhouseCatalogUtil.INSTANCE.getTruncateTableSql(tablePath));
+ }
+
+ public void createDatabase(String database, boolean ignoreIfExists) {
+ executeSql(ClickhouseCatalogUtil.INSTANCE.getCreateDatabaseSql(database, ignoreIfExists));
+ }
+
+ public void dropDatabase(String database, boolean ignoreIfNotExists) {
+ executeSql(ClickhouseCatalogUtil.INSTANCE.getDropDatabaseSql(database, ignoreIfNotExists));
+ }
+
public void close() {
if (this.client != null) {
this.client.close();
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
similarity index 98%
rename from seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
rename to seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
index 6986967f7a4..f3d49d8aef8 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.starrocks.util;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import lombok.Getter;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
new file mode 100644
index 00000000000..5728b18bcfe
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ClickhouseCreateTableTest {
+
+ @Test
+ public void test() {
+ List columns = new ArrayList<>();
+
+ columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, ""));
+ columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "age", BasicType.INT_TYPE, (Long) null, true, null, "test comment"));
+ columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
+ columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));
+
+ String createTableSql =
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n"
+ + " ${rowtype_primary_key},\n"
+ + " ${rowtype_fields}\n"
+ + ") ENGINE = MergeTree()\n"
+ + "ORDER BY (${rowtype_primary_key})\n"
+ + "PRIMARY KEY (${rowtype_primary_key})\n"
+ + "SETTINGS\n"
+ + " index_granularity = 8192;",
+ "test1",
+ "test2",
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age")))
+ .constraintKey(
+ Arrays.asList(
+ ConstraintKey.of(
+ ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "unique_key",
+ Collections.singletonList(
+ ConstraintKey.ConstraintKeyColumn
+ .of(
+ "name",
+ ConstraintKey
+ .ColumnSortType
+ .DESC))),
+ ConstraintKey.of(
+ ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "unique_key2",
+ Collections.singletonList(
+ ConstraintKey.ConstraintKeyColumn
+ .of(
+ "score",
+ ConstraintKey
+ .ColumnSortType
+ .ASC)))))
+ .columns(columns)
+ .build(),
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+ Assertions.assertEquals(
+ createTableSql,
+ "CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n"
+ + " `id` Int64 ,`age` Int32 COMMENT 'test comment',\n"
+ + " `name` String ,\n"
+ + "`score` Int32 ,\n"
+ + "`gender` Int8 ,\n"
+ + "`create_time` Int64 \n"
+ + ") ENGINE = MergeTree()\n"
+ + "ORDER BY (`id`,`age`)\n"
+ + "PRIMARY KEY (`id`,`age`)\n"
+ + "SETTINGS\n"
+ + " index_granularity = 8192;");
+ System.out.println(createTableSql);
+
+ String createTemplate = ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of(StringUtils.EMPTY, Collections.emptyList()))
+ .constraintKey(Collections.emptyList())
+ .columns(columns)
+ .build();
+ TablePath tablePath = TablePath.of("test1.test2");
+ SeaTunnelRuntimeException actualSeaTunnelRuntimeException =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ createTemplate,
+ "test1",
+ "test2",
+ tableSchema,
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
+
+ String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+ SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+ CommonError.sqlTemplateHandledError(
+ tablePath.getFullName(),
+ SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+ createTemplate,
+ primaryKeyHolder,
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+ Assertions.assertEquals(
+ exceptSeaTunnelRuntimeException.getMessage(),
+ actualSeaTunnelRuntimeException.getMessage());
+ }
+
+ @Test
+ public void testInSeq() {
+
+ List columns = new ArrayList<>();
+
+ columns.add(
+ PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_LINENUMBER", BasicType.INT_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_QUANTITY", new DecimalType(15, 2), (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_EXTENDEDPRICE", new DecimalType(15, 2), (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_DISCOUNT", new DecimalType(15, 2), (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_TAX", new DecimalType(15, 2), (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_RETURNFLAG", BasicType.STRING_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_LINESTATUS", BasicType.STRING_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_COMMITDATE",
+ LocalTimeType.LOCAL_DATE_TYPE,
+ (Long) null,
+ false,
+ null,
+ ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_RECEIPTDATE",
+ LocalTimeType.LOCAL_DATE_TYPE,
+ (Long) null,
+ false,
+ null,
+ ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_SHIPINSTRUCT", BasicType.STRING_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_SHIPMODE", BasicType.STRING_TYPE, (Long) null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_COMMENT", BasicType.STRING_TYPE, (Long) null, false, null, ""));
+
+ String result =
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n"
+ + "`L_COMMITDATE`,\n"
+ + "${rowtype_primary_key},\n"
+ + "L_SUPPKEY BIGINT NOT NULL,\n"
+ + "${rowtype_fields}\n"
+ + ") ENGINE=MergeTree()\n"
+ + " ORDER BY (L_COMMITDATE, ${rowtype_primary_key}, L_SUPPKEY)\n"
+ + " PRIMARY KEY (L_COMMITDATE, ${rowtype_primary_key}, L_SUPPKEY)\n"
+ + "SETTINGS\n"
+ + " index_granularity = 8192;",
+ "tpch",
+ "lineitem",
+ TableSchema.builder()
+ .primaryKey(
+ PrimaryKey.of(
+ "", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
+ .columns(columns)
+ .build(),
+ ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+ String expected =
+ "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ + "`L_COMMITDATE` Date ,\n"
+ + "`L_ORDERKEY` Int32 ,`L_LINENUMBER` Int32 ,\n"
+ + "L_SUPPKEY BIGINT NOT NULL,\n"
+ + "`L_PARTKEY` Int32 ,\n"
+ + "`L_QUANTITY` Decimal(15, 2) ,\n"
+ + "`L_EXTENDEDPRICE` Decimal(15, 2) ,\n"
+ + "`L_DISCOUNT` Decimal(15, 2) ,\n"
+ + "`L_TAX` Decimal(15, 2) ,\n"
+ + "`L_RETURNFLAG` String ,\n"
+ + "`L_LINESTATUS` String ,\n"
+ + "`L_SHIPDATE` Date ,\n"
+ + "`L_RECEIPTDATE` Date ,\n"
+ + "`L_SHIPINSTRUCT` String ,\n"
+ + "`L_SHIPMODE` String ,\n"
+ + "`L_COMMENT` String \n"
+ + ") ENGINE=MergeTree()\n"
+ + " ORDER BY (L_COMMITDATE, `L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+ + " PRIMARY KEY (L_COMMITDATE, `L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+ + "SETTINGS\n"
+ + " index_granularity = 8192;";
+ Assertions.assertEquals(result, expected);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
new file mode 100644
index 00000000000..7d10260cf5e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.util;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public abstract class CatalogUtil {
+
+ public abstract String columnToConnectorType(Column column);
+
+ public String getCreateTableSql(
+ String template,
+ String database,
+ String table,
+ TableSchema tableSchema,
+ String optionsKey) {
+ String primaryKey = "";
+ if (tableSchema.getPrimaryKey() != null) {
+ primaryKey =
+ tableSchema.getPrimaryKey().getColumnNames().stream()
+ .map(r -> "`" + r + "`")
+ .collect(Collectors.joining(","));
+ }
+ String uniqueKey = "";
+ if (!tableSchema.getConstraintKeys().isEmpty()) {
+ uniqueKey =
+ tableSchema.getConstraintKeys().stream()
+ .flatMap(c -> c.getColumnNames().stream())
+ .map(r -> "`" + r.getColumnName() + "`")
+ .collect(Collectors.joining(","));
+ }
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
+ primaryKey,
+ TablePath.of(database, table).getFullName(),
+ optionsKey);
+ template =
+ template.replaceAll(
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
+ primaryKey);
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
+ uniqueKey,
+ TablePath.of(database, table).getFullName(),
+ optionsKey);
+
+ template =
+ template.replaceAll(
+ SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
+ Map columnInTemplate =
+ CreateTableParser.getColumnList(template);
+ template = mergeColumnInTemplate(columnInTemplate, tableSchema, template);
+
+ String rowTypeFields =
+ tableSchema.getColumns().stream()
+ .filter(column -> !columnInTemplate.containsKey(column.getName()))
+ .map(x -> columnToConnectorType(x))
+ .collect(Collectors.joining(",\n"));
+
+ if (template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) {
+ // TODO: Remove this compatibility config
+ template =
+ template.replaceAll(
+ SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table);
+ log.warn(
+ "The variable placeholder `${table_name}` has been marked as deprecated and will be removed soon, please use `${table}`");
+ }
+
+ return template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), database)
+ .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), table)
+ .replaceAll(
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
+ }
+
+ private String mergeColumnInTemplate(
+ Map columnInTemplate,
+ TableSchema tableSchema,
+ String template) {
+ int offset = 0;
+ Map columnMap =
+ tableSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName, Function.identity()));
+ List columnInfosInSeq =
+ columnInTemplate.values().stream()
+ .sorted(
+ Comparator.comparingInt(
+ CreateTableParser.ColumnInfo::getStartIndex))
+ .collect(Collectors.toList());
+ for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
+ String col = columnInfo.getName();
+ if (StringUtils.isEmpty(columnInfo.getInfo())) {
+ if (columnMap.containsKey(col)) {
+ Column column = columnMap.get(col);
+ String newCol = columnToConnectorType(column);
+ String prefix = template.substring(0, columnInfo.getStartIndex() + offset);
+ String suffix = template.substring(offset + columnInfo.getEndIndex());
+ if (prefix.endsWith("`")) {
+ prefix = prefix.substring(0, prefix.length() - 1);
+ offset--;
+ }
+ if (suffix.startsWith("`")) {
+ suffix = suffix.substring(1);
+ offset--;
+ }
+ template = prefix + newCol + suffix;
+ offset += newCol.length() - columnInfo.getName().length();
+ } else {
+ throw new IllegalArgumentException("Can't find column " + col + " in table.");
+ }
+ }
+ }
+ return template;
+ }
+
+ public String getDropDatabaseSql(String database, boolean ignoreIfNotExists) {
+ if (ignoreIfNotExists) {
+ return "DROP DATABASE IF EXISTS `" + database + "`";
+ } else {
+ return "DROP DATABASE `" + database + "`";
+ }
+ }
+
+ public String getCreateDatabaseSql(String database, boolean ignoreIfExists) {
+ if (ignoreIfExists) {
+ return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
+ } else {
+ return "CREATE DATABASE `" + database + "`";
+ }
+ }
+
+ public String getDropTableSql(TablePath tablePath, boolean ignoreIfNotExists) {
+ if (ignoreIfNotExists) {
+ return "DROP TABLE IF EXISTS " + tablePath.getFullName();
+ } else {
+ return "DROP TABLE " + tablePath.getFullName();
+ }
+ }
+
+ public String getTruncateTableSql(TablePath tablePath) {
+ return "TRUNCATE TABLE " + tablePath.getFullName();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
similarity index 98%
rename from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java
rename to seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
index a911f1e1a2c..7baff4c5c32 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.doris.util;
+package org.apache.seatunnel.connectors.seatunnel.common.util;
import lombok.Getter;
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index c7ad6f65052..91ce2a51d2d 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CreateTableParser;
import org.apache.commons.lang3.StringUtils;
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index aecacf3eb28..ae97cccfa43 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -40,6 +40,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
@@ -203,11 +204,12 @@ public CatalogTable getTable(TablePath tablePath)
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
this.createTable(
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
template,
tablePath.getDatabaseName(),
tablePath.getTableName(),
- table.getTableSchema()));
+ table.getTableSchema(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
}
@Override
@@ -215,7 +217,9 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
conn.createStatement()
- .execute(StarRocksSaveModeUtil.getDropTableSql(tablePath, ignoreIfNotExists));
+ .execute(
+ StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
+ tablePath, ignoreIfNotExists));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
@@ -227,7 +231,7 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
try {
if (ignoreIfNotExists) {
conn.createStatement()
- .execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+ .execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
}
} catch (Exception e) {
throw new CatalogException(
@@ -264,7 +268,7 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
try {
conn.createStatement()
.execute(
- StarRocksSaveModeUtil.getCreateDatabaseSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
tablePath.getDatabaseName(), ignoreIfExists));
} catch (Exception e) {
throw new CatalogException(
@@ -278,7 +282,7 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
try {
conn.createStatement()
.execute(
- StarRocksSaveModeUtil.getDropDatabaseSql(
+ StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
tablePath.getDatabaseName(), ignoreIfNotExists));
} catch (Exception e) {
throw new CatalogException(
@@ -360,8 +364,6 @@ private Map buildConnectorOptions(TablePath tablePath) {
options.put("connector", "starrocks");
options.put("url", baseUrl + tablePath.getDatabaseName());
options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
return options;
}
@@ -497,18 +499,22 @@ public PreviewResult previewAction(
if (actionType == ActionType.CREATE_TABLE) {
Preconditions.checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
return new SQLPreviewResult(
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
template,
tablePath.getDatabaseName(),
tablePath.getTableName(),
- catalogTable.get().getTableSchema()));
+ catalogTable.get().getTableSchema(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
} else if (actionType == ActionType.DROP_TABLE) {
- return new SQLPreviewResult(StarRocksSaveModeUtil.getDropTableSql(tablePath, true));
+ return new SQLPreviewResult(
+ StarRocksSaveModeUtil.INSTANCE.getDropTableSql(tablePath, true));
} else if (actionType == ActionType.TRUNCATE_TABLE) {
- return new SQLPreviewResult(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+ return new SQLPreviewResult(
+ StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
} else if (actionType == ActionType.CREATE_DATABASE) {
return new SQLPreviewResult(
- StarRocksSaveModeUtil.getCreateDatabaseSql(tablePath.getDatabaseName(), true));
+ StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
+ tablePath.getDatabaseName(), true));
} else if (actionType == ActionType.DROP_DATABASE) {
return new SQLPreviewResult(
"DROP DATABASE IF EXISTS `" + tablePath.getDatabaseName() + "`");
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index a89f0347e3c..02d3118e071 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -17,95 +17,24 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
-import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@Slf4j
-public class StarRocksSaveModeUtil {
-
- public static String getCreateTableSql(
- String template, String database, String table, TableSchema tableSchema) {
- String primaryKey = "";
- if (tableSchema.getPrimaryKey() != null) {
- primaryKey =
- tableSchema.getPrimaryKey().getColumnNames().stream()
- .map(r -> "`" + r + "`")
- .collect(Collectors.joining(","));
- }
- String uniqueKey = "";
- if (!tableSchema.getConstraintKeys().isEmpty()) {
- uniqueKey =
- tableSchema.getConstraintKeys().stream()
- .flatMap(c -> c.getColumnNames().stream())
- .map(r -> "`" + r.getColumnName() + "`")
- .collect(Collectors.joining(","));
- }
- SqlTemplate.canHandledByTemplateWithPlaceholder(
- template,
- SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
- primaryKey,
- TablePath.of(database, table).getFullName(),
- StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
- template =
- template.replaceAll(
- SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
- primaryKey);
- SqlTemplate.canHandledByTemplateWithPlaceholder(
- template,
- SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
- uniqueKey,
- TablePath.of(database, table).getFullName(),
- StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+public class StarRocksSaveModeUtil extends CatalogUtil {
- template =
- template.replaceAll(
- SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
- Map columnInTemplate =
- CreateTableParser.getColumnList(template);
- template = mergeColumnInTemplate(columnInTemplate, tableSchema, template);
-
- String rowTypeFields =
- tableSchema.getColumns().stream()
- .filter(column -> !columnInTemplate.containsKey(column.getName()))
- .map(StarRocksSaveModeUtil::columnToStarrocksType)
- .collect(Collectors.joining(",\n"));
-
- if (template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) {
- // TODO: Remove this compatibility config
- template =
- template.replaceAll(
- SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table);
- log.warn(
- "The variable placeholder `${table_name}` has been marked as deprecated and will be removed soon, please use `${table}`");
- }
+ public static final StarRocksSaveModeUtil INSTANCE = new StarRocksSaveModeUtil();
- return template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), database)
- .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), table)
- .replaceAll(
- SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
- }
-
- private static String columnToStarrocksType(Column column) {
+ public String columnToConnectorType(Column column) {
checkNotNull(column, "The column is required.");
return String.format(
"`%s` %s %s %s",
@@ -119,46 +48,6 @@ private static String columnToStarrocksType(Column column) {
: "COMMENT '" + column.getComment() + "'");
}
- private static String mergeColumnInTemplate(
- Map columnInTemplate,
- TableSchema tableSchema,
- String template) {
- int offset = 0;
- Map columnMap =
- tableSchema.getColumns().stream()
- .collect(Collectors.toMap(Column::getName, Function.identity()));
- List columnInfosInSeq =
- columnInTemplate.values().stream()
- .sorted(
- Comparator.comparingInt(
- CreateTableParser.ColumnInfo::getStartIndex))
- .collect(Collectors.toList());
- for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
- String col = columnInfo.getName();
- if (StringUtils.isEmpty(columnInfo.getInfo())) {
- if (columnMap.containsKey(col)) {
- Column column = columnMap.get(col);
- String newCol = columnToStarrocksType(column);
- String prefix = template.substring(0, columnInfo.getStartIndex() + offset);
- String suffix = template.substring(offset + columnInfo.getEndIndex());
- if (prefix.endsWith("`")) {
- prefix = prefix.substring(0, prefix.length() - 1);
- offset--;
- }
- if (suffix.startsWith("`")) {
- suffix = suffix.substring(1);
- offset--;
- }
- template = prefix + newCol + suffix;
- offset += newCol.length() - columnInfo.getName().length();
- } else {
- throw new IllegalArgumentException("Can't find column " + col + " in table.");
- }
- }
- }
- return template;
- }
-
private static String dataTypeToStarrocksType(SeaTunnelDataType> dataType, long length) {
checkNotNull(dataType, "The SeaTunnel's data type is required.");
switch (dataType.getSqlType()) {
@@ -207,32 +96,4 @@ private static String dataTypeToStarrocksType(SeaTunnelDataType> dataType, lon
}
throw new IllegalArgumentException("Unsupported SeaTunnel's data type: " + dataType);
}
-
- public static String getCreateDatabaseSql(String database, boolean ignoreIfExists) {
- if (ignoreIfExists) {
- return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
- } else {
- return "CREATE DATABASE `" + database + "`";
- }
- }
-
- public static String getDropDatabaseSql(String database, boolean ignoreIfNotExists) {
- if (ignoreIfNotExists) {
- return "DROP DATABASE IF EXISTS `" + database + "`";
- } else {
- return "DROP DATABASE `" + database + "`";
- }
- }
-
- public static String getDropTableSql(TablePath tablePath, boolean ignoreIfNotExists) {
- if (ignoreIfNotExists) {
- return "DROP TABLE IF EXISTS " + tablePath.getFullName();
- } else {
- return "DROP TABLE " + tablePath.getFullName();
- }
- }
-
- public static String getTruncateTableSql(TablePath tablePath) {
- return "TRUNCATE TABLE " + tablePath.getFullName();
- }
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index 763413335aa..c4d06167335 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -65,7 +65,7 @@ public void test() {
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));
String result =
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n"
+ "${rowtype_primary_key} , \n"
+ "${rowtype_unique_key} , \n"
@@ -110,7 +110,8 @@ public void test() {
.ColumnSortType
.ASC)))))
.columns(columns)
- .build());
+ .build(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
"CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
+ "`id` BIGINT NULL ,`age` INT NULL , \n"
@@ -149,11 +150,12 @@ public void test() {
Assertions.assertThrows(
RuntimeException.class,
() ->
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
createTemplate,
tablePath.getDatabaseName(),
tablePath.getTableName(),
- catalogTable.getTableSchema()));
+ catalogTable.getTableSchema(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
CommonError.sqlTemplateHandledError(
@@ -233,7 +235,7 @@ public void testInSeq() {
"L_COMMENT", BasicType.STRING_TYPE, (Long) null, false, null, ""));
String result =
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n"
+ "`L_COMMITDATE`,\n"
+ "${rowtype_primary_key},\n"
@@ -252,7 +254,8 @@ public void testInSeq() {
PrimaryKey.of(
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
- .build());
+ .build(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ "`L_COMMITDATE` DATE NOT NULL ,\n"
@@ -290,7 +293,7 @@ public void testWithVarchar() {
columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 70000, true, null, ""));
String result =
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n"
+ "${rowtype_primary_key} , \n"
+ "`create_time` DATETIME NOT NULL , \n"
@@ -312,7 +315,8 @@ public void testWithVarchar() {
TableSchema.builder()
.primaryKey(PrimaryKey.of("", Arrays.asList("id", "age")))
.columns(columns)
- .build());
+ .build(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
"CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
@@ -347,7 +351,7 @@ public void testWithThreePrimaryKeys() {
columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 70000, true, null, ""));
String result =
- StarRocksSaveModeUtil.getCreateTableSql(
+ StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
"create table '${database}'.'${table}'(\n"
+ " ${rowtype_fields}\n"
+ " )\n"
@@ -358,7 +362,8 @@ public void testWithThreePrimaryKeys() {
.primaryKey(
PrimaryKey.of("test", Arrays.asList("id", "age", "name")))
.columns(columns)
- .build());
+ .build(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
"create table 'test1'.'test2'(\n"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 76bdfaa2816..b830a113893 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -107,6 +107,86 @@ public void testSourceParallelism(TestContainer container) throws Exception {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ @TestTemplate
+ public void clickhouseWithCreateSchemaWhenNotExist(TestContainer container) throws Exception {
+ String tableName = "default.sink_table_for_schema";
+ Container.ExecResult execResult =
+ container.executeJob("/clickhouse_with_create_schema_when_not_exist.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ execResult = container.executeJob("/clickhouse_with_create_schema_when_not_exist.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(200, countData(tableName));
+ dropTable(tableName);
+ }
+
+ @TestTemplate
+ public void clickhouseWithRecreateSchemaAndAppendData(TestContainer container)
+ throws Exception {
+ String tableName = "default.sink_table_for_schema";
+ Container.ExecResult execResult =
+ container.executeJob("/clickhouse_with_recreate_schema_and_append_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ execResult = container.executeJob("/clickhouse_with_recreate_schema_and_append_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ dropTable(tableName);
+ }
+
+ @TestTemplate
+ public void clickhouseWithErrorWhenSchemaNotExist(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+ container.executeJob("/clickhouse_with_error_when_schema_not_exist.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ Assertions.assertTrue(
+ execResult
+ .getStderr()
+ .contains(
+ "ErrorCode:[API-11], ErrorDescription:[The sink table not exist]"));
+ }
+
+ @TestTemplate
+ public void clickhouseWithCreateSchemaWhenNotExistAndDropData(TestContainer container)
+ throws Exception {
+ String tableName = "default.sink_table_for_schema";
+ Container.ExecResult execResult =
+ container.executeJob(
+ "/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ execResult =
+ container.executeJob(
+ "/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ dropTable(tableName);
+ }
+
+ @TestTemplate
+ public void clickhouseWithErrorWhenDataExists(TestContainer container) throws Exception {
+ String tableName = "default.sink_table_for_schema";
+ Container.ExecResult execResult =
+ container.executeJob("/clickhouse_with_error_when_data_exists.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, countData(tableName));
+ execResult = container.executeJob("/clickhouse_with_error_when_data_exists.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ Assertions.assertTrue(
+ execResult.getStderr().contains("The target data source already has data"));
+ dropTable(tableName);
+ }
+
+ @TestTemplate
+ public void clickhouseRecreateSchemaAndCustom(TestContainer container) throws Exception {
+ String tableName = "default.sink_table_for_schema";
+ Container.ExecResult execResult =
+ container.executeJob("/clickhouse_with_recreate_schema_and_custom.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(101, countData(tableName));
+ dropTable(tableName);
+ }
+
@BeforeAll
@Override
public void startUp() throws Exception {
@@ -194,6 +274,29 @@ private Array toSqlArray(Object value) throws SQLException {
return connection.createArrayOf(sqlType, elements);
}
+ private int countData(String tableName) {
+ try {
+ String sql = "select count(1) from " + tableName;
+ ResultSet resultSet = this.connection.createStatement().executeQuery(sql);
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ } else {
+ return -1;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void dropTable(String tableName) {
+ try {
+ Statement statement = this.connection.createStatement();
+ statement.execute("drop table if exists " + tableName);
+ } catch (SQLException e) {
+ throw new RuntimeException("Drop table failed!", e);
+ }
+ }
+
private void batchInsertData() {
String sql = CONFIG.getString(INSERT_SQL);
PreparedStatement preparedStatement = null;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf
new file mode 100644
index 00000000000..a05d856ff60
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf
new file mode 100644
index 00000000000..cbb772e6da4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="DROP_DATA"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf
new file mode 100644
index 00000000000..42d4ce8fb15
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="ERROR_WHEN_DATA_EXISTS"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf
new file mode 100644
index 00000000000..39983a90f5a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ "schema_save_mode"="ERROR_WHEN_SCHEMA_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf
new file mode 100644
index 00000000000..057252aeba4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="APPEND_DATA"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf
new file mode 100644
index 00000000000..0d3e56cd8b2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ c_map = "map"
+ c_array = "array"
+ }
+ primaryKey {
+ name = "c_string"
+ columnNames = [c_string]
+ }
+ }
+ row.num = 100
+ }
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "sink_table_for_schema"
+ username = "default"
+ password = ""
+ custom_sql="INSERT INTO default.sink_table_for_schema ( c_string) VALUES ( '1' );"
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="CUSTOM_PROCESSING"
+ primary_key = "c_string"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+}
\ No newline at end of file