From 7b2f53831025c5ac1c74c05fec3b88128f705ea6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=80=81=E7=8E=8B?=
<58297137+chl-wxp@users.noreply.github.com>
Date: Tue, 9 Jul 2024 10:32:09 +0800
Subject: [PATCH] [feature][connector-file-local] add save mode function for
localfile (#7080)
---
docs/en/connector-v2/sink/LocalFile.md | 18 +++
.../file/catalog/AbstractFileCatalog.java | 131 ++++++++++++++++++
.../seatunnel/file/config/BaseSinkConfig.java | 22 +++
.../file/sink/BaseMultipleTableFileSink.java | 34 ++++-
.../file/local/catalog/LocalFileCatalog.java | 29 ++++
.../catalog/LocalFileCatalogFactory.java | 53 +++++++
.../file/local/sink/LocalFileSinkFactory.java | 2 +
.../e2e/connector/file/local/LocalFileIT.java | 87 +++++++++++-
.../json/fake_to_local_file_json.conf | 2 +
.../fake_to_local_file_json_save_mode.conf | 63 +++++++++
.../batch_last_checkpoint_error.conf | 14 +-
11 files changed, 441 insertions(+), 14 deletions(-)
create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index b0d6b45bd1c..b0d41419d50 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -60,6 +60,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
+| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
+| data_save_mode | string | no | APPEND_DATA | Existing data processing method |
### path [string]
@@ -205,6 +207,20 @@ Only used when file_format_type is text,csv.false:don't write header,true:write
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.
+### schema_save_mode [string]
+
+Existing dir processing method.
+- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
+- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
+- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
+
+### data_save_mode [string]
+
+Existing data processing method.
+- DROP_DATA: preserve dir and delete data files
+- APPEND_DATA: preserve dir, preserve data files
+- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
+
## Example
For orc file format simple config
@@ -278,6 +294,8 @@ LocalFile {
file_format_type="excel"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
+ schema_save_mode=RECREATE_SCHEMA
+ data_save_mode=DROP_DATA
}
```
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
new file mode 100644
index 00000000000..f7a1b46a8b4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import lombok.SneakyThrows;
+
+import java.util.List;
+
+public abstract class AbstractFileCatalog implements Catalog {
+
+ protected final String catalogName;
+ private final HadoopFileSystemProxy hadoopFileSystemProxy;
+ private final String filePath;
+
+ protected AbstractFileCatalog(
+ HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
+ this.catalogName = catalogName;
+ this.filePath = filePath;
+ this.hadoopFileSystemProxy = hadoopFileSystemProxy;
+ }
+
+ @Override
+ public void open() throws CatalogException {}
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return null;
+ }
+
+ @Override
+ public List listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ return null;
+ }
+
+ @SneakyThrows
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ return hadoopFileSystemProxy.fileExist(filePath);
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ return null;
+ }
+
+ @SneakyThrows
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ hadoopFileSystemProxy.createDir(filePath);
+ }
+
+ @SneakyThrows
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ hadoopFileSystemProxy.deleteFile(filePath);
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {}
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {}
+
+ @SneakyThrows
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ hadoopFileSystemProxy.deleteFile(filePath);
+ hadoopFileSystemProxy.createDir(filePath);
+ }
+
+ @SneakyThrows
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ final List locatedFileStatuses =
+ hadoopFileSystemProxy.listFile(filePath);
+ return CollectionUtils.isNotEmpty(locatedFileStatuses);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 48a2d2b4369..0759baf9e4f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -19,6 +19,8 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
@@ -28,6 +30,10 @@
import java.util.Collections;
import java.util.List;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class BaseSinkConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
@@ -293,4 +299,20 @@ public class BaseSinkConfig {
.defaultValue(Collections.emptyList())
.withDescription(
"Support writing Parquet INT96 from a 12-byte field, only valid for parquet files.");
+
+ public static final Option SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription(
+ "Before the synchronization task begins, process the existing path");
+
+ public static final Option DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription(
+ "Before the synchronization task begins, different processing of data files that already exist in the directory");
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 6beb62d7e83..1ae4b840295 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -21,12 +21,20 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
@@ -39,20 +47,25 @@
import java.util.List;
import java.util.Optional;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
public abstract class BaseMultipleTableFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
- SupportMultiTableSink {
+ SupportMultiTableSink,
+ SupportSaveMode {
private final HadoopConf hadoopConf;
private final CatalogTable catalogTable;
private final FileSinkConfig fileSinkConfig;
private String jobId;
+ private final ReadonlyConfig readonlyConfig;
public abstract String getPluginName();
public BaseMultipleTableFileSink(
HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+ this.readonlyConfig = readonlyConfig;
this.hadoopConf = hadoopConf;
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
@@ -103,4 +116,23 @@ protected WriteStrategy createWriteStrategy() {
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return writeStrategy;
}
+
+ @Override
+ public Optional getSaveModeHandler() {
+
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ getPluginName());
+ if (catalogFactory == null) {
+ return Optional.empty();
+ }
+ final Catalog catalog = catalogFactory.createCatalog(getPluginName(), readonlyConfig);
+ SchemaSaveMode schemaSaveMode = readonlyConfig.get(BaseSinkConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode = readonlyConfig.get(BaseSinkConfig.DATA_SAVE_MODE);
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode, dataSaveMode, catalog, catalogTable, null));
+ }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
new file mode 100644
index 00000000000..ab784f56eac
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
+
+import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+public class LocalFileCatalog extends AbstractFileCatalog {
+
+ public LocalFileCatalog(
+ HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
+ super(hadoopFileSystemProxy, filePath, catalogName);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
new file mode 100644
index 00000000000..ac1c6b37baf
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class LocalFileCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ HadoopFileSystemProxy fileSystemUtils =
+ new HadoopFileSystemProxy(new LocalFileHadoopConf());
+ return new LocalFileCatalog(
+ fileSystemUtils,
+ options.get(BaseSourceConfigOptions.FILE_PATH),
+ factoryIdentifier());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.LOCAL.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 01d5fb1060c..fc699b42962 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -46,6 +46,8 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(BaseSinkConfig.FILE_PATH)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
+ .optional(BaseSinkConfig.DATA_SAVE_MODE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index 51494b50596..e278159efab 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.e2e.connector.file.local;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import org.apache.seatunnel.connectors.seatunnel.file.local.catalog.LocalFileCatalog;
+import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -27,27 +32,43 @@
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import io.airlift.compress.lzo.LzopCodec;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
type = {},
disabledReason = "The apache-compress version is not compatible with apache-poi")
+@Slf4j
public class LocalFileIT extends TestSuiteBase {
+ private GenericContainer> baseContainer;
+
/** Copy data files to container */
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
+ this.baseContainer = container;
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
@@ -121,7 +142,6 @@ public class LocalFileIT extends TestSuiteBase {
public void testLocalFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
-
helper.execute("/excel/fake_to_local_excel.conf");
helper.execute("/excel/local_excel_to_assert.conf");
helper.execute("/excel/local_excel_projection_to_assert.conf");
@@ -181,6 +201,71 @@ public void testLocalFileReadAndWrite(TestContainer container)
}
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {TestContainerId.SPARK_2_4},
+ type = {EngineType.FLINK},
+ disabledReason =
+ "Fink test is multi-node, LocalFile connector will use different containers for obtaining files")
+ public void testLocalFileReadAndWriteWithSaveMode(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+ // test save_mode
+ String path = "/tmp/seatunnel/localfile/json/fake";
+ Assertions.assertEquals(getFileListFromContainer(path).size(), 0);
+ helper.execute("/json/fake_to_local_file_json_save_mode.conf");
+ Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
+ helper.execute("/json/fake_to_local_file_json_save_mode.conf");
+ Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
+ }
+
+ @SneakyThrows
+ private List getFileListFromContainer(String path) {
+ String command = "ls -1 " + path;
+ ExecCreateCmdResponse execCreateCmdResponse =
+ dockerClient
+ .execCreateCmd(baseContainer.getContainerId())
+ .withCmd("sh", "-c", command)
+ .withAttachStdout(true)
+ .withAttachStderr(true)
+ .exec();
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ dockerClient
+ .execStartCmd(execCreateCmdResponse.getId())
+ .exec(new ExecStartResultCallback(outputStream, System.err))
+ .awaitCompletion();
+
+ String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim();
+ List fileList = new ArrayList<>();
+ log.info("container path file list is :{}", output);
+ String[] files = output.split("\n");
+ for (String file : files) {
+ if (StringUtils.isNotEmpty(file)) {
+ log.info("container path file name is :{}", file);
+ fileList.add(file);
+ }
+ }
+ return fileList;
+ }
+
+ @TestTemplate
+ public void testLocalFileCatalog(TestContainer container)
+ throws IOException, InterruptedException {
+ final LocalFileCatalog localFileCatalog =
+ new LocalFileCatalog(
+ new HadoopFileSystemProxy(new LocalFileHadoopConf()),
+ "/tmp/seatunnel/json/test1",
+ FileSystemType.LOCAL.getFileSystemPluginName());
+ final TablePath tablePath = TablePath.DEFAULT;
+ Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
+ localFileCatalog.createTable(null, null, false);
+ Assertions.assertTrue(localFileCatalog.tableExists(tablePath));
+ Assertions.assertFalse(localFileCatalog.isExistsData(tablePath));
+ localFileCatalog.dropTable(tablePath, false);
+ Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
+ }
+
private Path convertToLzoFile(File file) throws IOException {
LzopCodec lzo = new LzopCodec();
Path path = Paths.get(file.getAbsolutePath() + ".lzo");
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
index c5ea2a734e3..458bb052585 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
@@ -77,5 +77,7 @@ sink {
file_format_type = "json"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="DROP_DATA"
}
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
new file mode 100644
index 00000000000..087bf3f29bb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ }
+ }
+ result_table_name = "fake"
+ rows = [
+ {fields = ["1",1,1,123,42543,1.2], kind = INSERT}
+ {fields = ["2",1,1,123,42543,1.2], kind = INSERT}
+ ]
+ }
+}
+
+sink {
+ LocalFile {
+ path = "/tmp/seatunnel/localfile/json/${table_name}"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "json"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="DROP_DATA"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
index 84356210ea3..910484f65b7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
@@ -75,17 +75,7 @@ transform {
}
sink {
- LocalFile {
- path = "/hive/warehouse/test1"
- field_delimiter = "\t"
- row_delimiter = "\n"
- partition_by = ["c_string"]
- partition_dir_expression = "${k0}=${v0}"
- is_partition_field_write_in_file = true
- file_name_expression = "${transactionId}_${now}"
- file_format_type = "text"
- filename_time_format = "yyyy.MM.dd"
- is_enable_transaction = true
- save_mode = "error"
+ InMemory {
+ throw_exception = true
}
}