diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java deleted file mode 100644 index 17ffdd2d406..00000000000 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink; - -import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; - -import com.google.auto.service.AutoService; - -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; - -@AutoService(Factory.class) -public class ClickhouseSinkFactory implements TableSinkFactory { - @Override - public String factoryIdentifier() { - return "Clickhouse"; - } - - @Override - public OptionRule optionRule() { - return OptionRule.builder() - .required(HOST, DATABASE, TABLE) - .optional( - CLICKHOUSE_CONFIG, - BULK_SIZE, - SPLIT_MODE, - SHARDING_KEY, - PRIMARY_KEY, - SUPPORT_UPSERT, - ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE) - .bundled(USERNAME, PASSWORD) - .build(); - } -} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java index 52397229dbc..04ee5755e59 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder; import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java index d2de6fd182b..22f18694e23 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java @@ -17,210 +17,35 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; - -import com.clickhouse.client.ClickHouseNode; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Properties; - -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; -@AutoService(SeaTunnelSink.class) public class ClickhouseSink implements SeaTunnelSink { private ReaderOption option; + private CatalogTable catalogTable; - @Override - public String getPluginName() { - return "Clickhouse"; + public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) { + this.option = option; + this.catalogTable = catalogTable; } @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists(config, HOST.key(), DATABASE.key(), TABLE.key()); - - boolean isCredential = config.hasPath(USERNAME.key()) || config.hasPath(PASSWORD.key()); - - if (isCredential) { - result = CheckConfigUtil.checkAllExists(config, USERNAME.key(), PASSWORD.key()); - } - - if (!result.isSuccess()) { - throw new ClickhouseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - Map defaultConfig = - ImmutableMap.builder() - .put(BULK_SIZE.key(), BULK_SIZE.defaultValue()) - .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue()) - .put(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue()) - .build(); - - config = config.withFallback(ConfigFactory.parseMap(defaultConfig)); - - List nodes; - if (!isCredential) { - nodes = - ClickhouseUtil.createNodes( - config.getString(HOST.key()), - config.getString(DATABASE.key()), - config.getString(SERVER_TIME_ZONE.key()), - null, - null, - null); - } else { - nodes = - ClickhouseUtil.createNodes( - config.getString(HOST.key()), - config.getString(DATABASE.key()), - config.getString(SERVER_TIME_ZONE.key()), - config.getString(USERNAME.key()), - config.getString(PASSWORD.key()), - null); - } - - Properties clickhouseProperties = new Properties(); - if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) { - config.getObject(CLICKHOUSE_CONFIG.key()) - .forEach( - (key, value) -> - clickhouseProperties.put( - key, String.valueOf(value.unwrapped()))); - } - - if (isCredential) { - clickhouseProperties.put("user", config.getString(USERNAME.key())); - clickhouseProperties.put("password", config.getString(PASSWORD.key())); - } - - ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0)); - Map tableSchema = - proxy.getClickhouseTableSchema(config.getString(TABLE.key())); - String shardKey = null; - String shardKeyType = null; - ClickhouseTable table = - proxy.getClickhouseTable( - config.getString(DATABASE.key()), config.getString(TABLE.key())); - if (config.getBoolean(SPLIT_MODE.key())) { - if (!"Distributed".equals(table.getEngine())) { - throw new ClickhouseConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "split mode only support table which engine is " - + "'Distributed' engine at now"); - } - if (config.hasPath(SHARDING_KEY.key())) { - shardKey = config.getString(SHARDING_KEY.key()); - shardKeyType = tableSchema.get(shardKey); - } - } - ShardMetadata metadata; - - if (isCredential) { - metadata = - new ShardMetadata( - shardKey, - shardKeyType, - table.getSortingKey(), - config.getString(DATABASE.key()), - config.getString(TABLE.key()), - table.getEngine(), - config.getBoolean(SPLIT_MODE.key()), - new Shard(1, 1, nodes.get(0)), - config.getString(USERNAME.key()), - config.getString(PASSWORD.key())); - } else { - metadata = - new ShardMetadata( - shardKey, - shardKeyType, - table.getSortingKey(), - config.getString(DATABASE.key()), - config.getString(TABLE.key()), - table.getEngine(), - config.getBoolean(SPLIT_MODE.key()), - new Shard(1, 1, nodes.get(0))); - } - - proxy.close(); - - String[] primaryKeys = null; - if (config.hasPath(PRIMARY_KEY.key())) { - String primaryKey = config.getString(PRIMARY_KEY.key()); - if (shardKey != null && !Objects.equals(primaryKey, shardKey)) { - throw new ClickhouseConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "sharding_key and primary_key must be consistent to ensure correct processing of cdc events"); - } - primaryKeys = new String[] {primaryKey}; - } - boolean supportUpsert = SUPPORT_UPSERT.defaultValue(); - if (config.hasPath(SUPPORT_UPSERT.key())) { - supportUpsert = config.getBoolean(SUPPORT_UPSERT.key()); - } - boolean allowExperimentalLightweightDelete = - ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue(); - if (config.hasPath(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) { - allowExperimentalLightweightDelete = - config.getBoolean(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key()); - } - this.option = - ReaderOption.builder() - .shardMetadata(metadata) - .properties(clickhouseProperties) - .tableEngine(table.getEngine()) - .tableSchema(tableSchema) - .bulkSize(config.getInt(BULK_SIZE.key())) - .primaryKeys(primaryKeys) - .supportUpsert(supportUpsert) - .allowExperimentalLightweightDelete(allowExperimentalLightweightDelete) - .build(); + public String getPluginName() { + return "Clickhouse"; } @Override @@ -241,7 +66,7 @@ public Optional> getWriterStateSerializer() { } @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.option.setSeaTunnelRowType(seaTunnelRowType); + public Optional getWriteCatalogTable() { + return Optional.of(catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java new file mode 100644 index 00000000000..720efacc321 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; + +import com.clickhouse.client.ClickHouseNode; +import com.google.auto.service.AutoService; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; + +@AutoService(Factory.class) +public class ClickhouseSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Clickhouse"; + } + + @Override + public TableSink createSink( + TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + List nodes = ClickhouseUtil.createNodes(readonlyConfig); + Properties clickhouseProperties = new Properties(); + readonlyConfig + .get(CLICKHOUSE_CONFIG) + .forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value))); + + clickhouseProperties.put("user", readonlyConfig.get(USERNAME)); + clickhouseProperties.put("password", readonlyConfig.get(PASSWORD)); + ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0)); + try { + Map tableSchema = + proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE)); + String shardKey = null; + String shardKeyType = null; + ClickhouseTable table = + proxy.getClickhouseTable( + readonlyConfig.get(DATABASE), readonlyConfig.get(TABLE)); + if (readonlyConfig.get(SPLIT_MODE)) { + if (!"Distributed".equals(table.getEngine())) { + throw new ClickhouseConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "split mode only support table which engine is " + + "'Distributed' engine at now"); + } + if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) { + shardKey = readonlyConfig.get(SHARDING_KEY); + shardKeyType = tableSchema.get(shardKey); + } + } + ShardMetadata metadata = + new ShardMetadata( + shardKey, + shardKeyType, + table.getSortingKey(), + readonlyConfig.get(DATABASE), + readonlyConfig.get(TABLE), + table.getEngine(), + readonlyConfig.get(SPLIT_MODE), + new Shard(1, 1, nodes.get(0)), + readonlyConfig.get(USERNAME), + readonlyConfig.get(PASSWORD)); + proxy.close(); + String[] primaryKeys = null; + if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) { + String primaryKey = readonlyConfig.get(PRIMARY_KEY); + if (shardKey != null && !Objects.equals(primaryKey, shardKey)) { + throw new ClickhouseConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "sharding_key and primary_key must be consistent to ensure correct processing of cdc events"); + } + primaryKeys = new String[] {primaryKey}; + } + boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT); + boolean allowExperimentalLightweightDelete = + readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE); + + ReaderOption option = + ReaderOption.builder() + .shardMetadata(metadata) + .properties(clickhouseProperties) + .seaTunnelRowType(catalogTable.getSeaTunnelRowType()) + .tableEngine(table.getEngine()) + .tableSchema(tableSchema) + .bulkSize(readonlyConfig.get(BULK_SIZE)) + .primaryKeys(primaryKeys) + .supportUpsert(supportUpsert) + .allowExperimentalLightweightDelete(allowExperimentalLightweightDelete) + .build(); + return () -> new ClickhouseSink(option, catalogTable); + } finally { + proxy.close(); + } + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(HOST, DATABASE, TABLE) + .optional( + CLICKHOUSE_CONFIG, + BULK_SIZE, + SPLIT_MODE, + SHARDING_KEY, + PRIMARY_KEY, + SUPPORT_UPSERT, + ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE) + .bundled(USERNAME, PASSWORD) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java index b5f1505d112..6b7f652aba4 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -28,7 +28,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder; import org.apache.commons.lang3.StringUtils; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java index 140e40b3b13..03f6efec311 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java @@ -21,7 +21,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine; import org.apache.commons.lang3.StringUtils; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index bb445d42820..4a0c80e02c5 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -38,10 +38,10 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; import com.clickhouse.client.ClickHouseNode; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java index 53e5fcb5ab0..5d69191cac0 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseRequest; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index 2abeb046470..e705acc7683 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -26,10 +26,10 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; import org.apache.commons.io.FileUtils; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java index 546f1f74660..2525caeb3d0 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine; import lombok.Getter; import lombok.Setter; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 2cc401dce24..d7c6b438564 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -17,142 +17,39 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseResponse; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; - -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; -@AutoService(SeaTunnelSource.class) public class ClickhouseSource implements SeaTunnelSource, SupportParallelism, SupportColumnProjection { private List servers; - private SeaTunnelRowType rowTypeInfo; + private CatalogTable catalogTable; private String sql; - @Override - public String getPluginName() { - return "Clickhouse"; + public ClickhouseSource(List servers, CatalogTable catalogTable, String sql) { + this.servers = servers; + this.catalogTable = catalogTable; + this.sql = sql; } @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - config, - HOST.key(), - DATABASE.key(), - SQL.key(), - USERNAME.key(), - PASSWORD.key()); - if (!result.isSuccess()) { - throw new ClickhouseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - Map defaultConfig = - ImmutableMap.builder() - .put(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue()) - .build(); - - config = config.withFallback(ConfigFactory.parseMap(defaultConfig)); - - Map customConfig = null; - - if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) { - customConfig = - config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entrySet -> - entrySet.getValue().unwrapped().toString())); - } - - servers = - ClickhouseUtil.createNodes( - config.getString(HOST.key()), - config.getString(DATABASE.key()), - config.getString(SERVER_TIME_ZONE.key()), - config.getString(USERNAME.key()), - config.getString(PASSWORD.key()), - customConfig); - - sql = config.getString(SQL.key()); - ClickHouseNode currentServer = - servers.get(ThreadLocalRandom.current().nextInt(servers.size())); - try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = - client.connect(currentServer) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(modifySQLToLimit1(config.getString(SQL.key()))) - .executeAndWait()) { - - int columnSize = response.getColumns().size(); - String[] fieldNames = new String[columnSize]; - SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; - - for (int i = 0; i < columnSize; i++) { - fieldNames[i] = response.getColumns().get(i).getColumnName(); - seaTunnelDataTypes[i] = TypeConvertUtil.convert(response.getColumns().get(i)); - } - - this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); - - } catch (ClickHouseException e) { - throw new ClickhouseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, ExceptionUtils.getMessage(e))); - } - } - - private String modifySQLToLimit1(String sql) { - return String.format("SELECT * FROM (%s) s LIMIT 1", sql); + public String getPluginName() { + return "Clickhouse"; } @Override @@ -161,14 +58,15 @@ public Boundedness getBoundedness() { } @Override - public SeaTunnelRowType getProducedType() { - return this.rowTypeInfo; + public List getProducedCatalogTables() { + return Collections.singletonList(catalogTable); } @Override public SourceReader createReader( SourceReader.Context readerContext) throws Exception { - return new ClickhouseSourceReader(servers, readerContext, this.rowTypeInfo, sql); + return new ClickhouseSourceReader( + servers, readerContext, this.catalogTable.getSeaTunnelRowType(), sql); } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index 4adea4b80ce..bb91d3c05ea 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -17,13 +17,37 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseColumn; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseResponse; import com.google.auto.service.AutoService; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; @@ -38,6 +62,61 @@ public String factoryIdentifier() { return "Clickhouse"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + List nodes = ClickhouseUtil.createNodes(readonlyConfig); + + String sql = readonlyConfig.get(SQL); + ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); + try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); + ClickHouseResponse response = + client.connect(currentServer) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(modifySQLToLimit1(sql)) + .executeAndWait()) { + TableSchema.Builder builder = TableSchema.builder(); + List columns = response.getColumns(); + columns.forEach( + column -> { + PhysicalColumn physicalColumn = + PhysicalColumn.of( + column.getColumnName(), + TypeConvertUtil.convert(column), + (long) column.getEstimatedLength(), + column.getScale(), + column.isNullable(), + null, + null); + builder.column(physicalColumn); + }); + String catalogName = "clickhouse_catalog"; + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of( + catalogName, readonlyConfig.get(DATABASE), "default"), + builder.build(), + Collections.emptyMap(), + Collections.emptyList(), + "", + catalogName); + return () -> + (SeaTunnelSource) + new ClickhouseSource(nodes, catalogTable, sql); + } catch (ClickHouseException e) { + throw new ClickhouseConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + factoryIdentifier(), PluginType.SOURCE, e.getMessage())); + } + } + + private String modifySQLToLimit1(String sql) { + return String.format("SELECT * FROM (%s) s LIMIT 1", sql); + } + @Override public OptionRule optionRule() { return OptionRule.builder() diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java similarity index 98% rename from seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java rename to seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java index bf0f9a55520..c4178182578 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable; import com.clickhouse.client.ClickHouseClient; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java index f787cf5c8fc..13667d0e407 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig; + import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -31,6 +34,16 @@ public class ClickhouseUtil { + public static List createNodes(ReadonlyConfig config) { + return createNodes( + config.get(ClickhouseConfig.HOST), + config.get(ClickhouseConfig.DATABASE), + config.get(ClickhouseConfig.SERVER_TIME_ZONE), + config.get(ClickhouseConfig.USERNAME), + config.get(ClickhouseConfig.PASSWORD), + null); + } + public static List createNodes( String nodeAddress, String database, diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java similarity index 94% rename from seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java rename to seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java index 067f09fdbc2..8974b7cd0c3 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink; +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java similarity index 94% rename from seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java rename to seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java index 02e7be5966d..9913d7a408e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool; +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; import java.io.Serializable; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java index e6c50b0611a..d193b53ea72 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkFactory; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;