Skip to content

Commit

Permalink
[feature][connector-file-local] add save mode function for localfile (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
chl-wxp authored Jul 9, 2024
1 parent 194472b commit 7b2f538
Show file tree
Hide file tree
Showing 11 changed files with 441 additions and 14 deletions.
18 changes: 18 additions & 0 deletions docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.<br/> false:don't write header,true:write header. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |

### path [string]

Expand Down Expand Up @@ -205,6 +207,20 @@ Only used when file_format_type is text,csv.false:don't write header,true:write
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

### schema_save_mode [string]

Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist

### data_save_mode [string]

Existing data processing method.
- DROP_DATA: preserve dir and delete data files
- APPEND_DATA: preserve dir, preserve data files
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported

## Example

For orc file format simple config
Expand Down Expand Up @@ -278,6 +294,8 @@ LocalFile {
file_format_type="excel"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
schema_save_mode=RECREATE_SCHEMA
data_save_mode=DROP_DATA
}

```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.catalog;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.fs.LocatedFileStatus;

import lombok.SneakyThrows;

import java.util.List;

public abstract class AbstractFileCatalog implements Catalog {

protected final String catalogName;
private final HadoopFileSystemProxy hadoopFileSystemProxy;
private final String filePath;

protected AbstractFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
this.catalogName = catalogName;
this.filePath = filePath;
this.hadoopFileSystemProxy = hadoopFileSystemProxy;
}

@Override
public void open() throws CatalogException {}

@Override
public void close() throws CatalogException {}

@Override
public String name() {
return catalogName;
}

@Override
public String getDefaultDatabase() throws CatalogException {
return null;
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return false;
}

@Override
public List<String> listDatabases() throws CatalogException {
return null;
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
return null;
}

@SneakyThrows
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
return hadoopFileSystemProxy.fileExist(filePath);
}

@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
return null;
}

@SneakyThrows
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
hadoopFileSystemProxy.createDir(filePath);
}

@SneakyThrows
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
hadoopFileSystemProxy.deleteFile(filePath);
}

@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {}

@SneakyThrows
@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
hadoopFileSystemProxy.deleteFile(filePath);
hadoopFileSystemProxy.createDir(filePath);
}

@SneakyThrows
@Override
public boolean isExistsData(TablePath tablePath) {
final List<LocatedFileStatus> locatedFileStatuses =
hadoopFileSystemProxy.listFile(filePath);
return CollectionUtils.isNotEmpty(locatedFileStatuses);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
Expand All @@ -28,6 +30,10 @@
import java.util.Collections;
import java.util.List;

import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class BaseSinkConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
Expand Down Expand Up @@ -293,4 +299,20 @@ public class BaseSinkConfig {
.defaultValue(Collections.emptyList())
.withDescription(
"Support writing Parquet INT96 from a 12-byte field, only valid for parquet files.");

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription(
"Before the synchronization task begins, process the existing path");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription(
"Before the synchronization task begins, different processing of data files that already exist in the directory");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
Expand All @@ -39,20 +47,25 @@
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;

public abstract class BaseMultipleTableFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {
SupportMultiTableSink,
SupportSaveMode {

private final HadoopConf hadoopConf;
private final CatalogTable catalogTable;
private final FileSinkConfig fileSinkConfig;
private String jobId;
private final ReadonlyConfig readonlyConfig;

public abstract String getPluginName();

public BaseMultipleTableFileSink(
HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.readonlyConfig = readonlyConfig;
this.hadoopConf = hadoopConf;
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
Expand Down Expand Up @@ -103,4 +116,23 @@ protected WriteStrategy createWriteStrategy() {
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return writeStrategy;
}

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {

CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
CatalogFactory.class,
getPluginName());
if (catalogFactory == null) {
return Optional.empty();
}
final Catalog catalog = catalogFactory.createCatalog(getPluginName(), readonlyConfig);
SchemaSaveMode schemaSaveMode = readonlyConfig.get(BaseSinkConfig.SCHEMA_SAVE_MODE);
DataSaveMode dataSaveMode = readonlyConfig.get(BaseSinkConfig.DATA_SAVE_MODE);
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, catalogTable, null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;

import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

public class LocalFileCatalog extends AbstractFileCatalog {

public LocalFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
super(hadoopFileSystemProxy, filePath, catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class LocalFileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopFileSystemProxy fileSystemUtils =
new HadoopFileSystemProxy(new LocalFileHadoopConf());
return new LocalFileCatalog(
fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH),
factoryIdentifier());
}

@Override
public String factoryIdentifier() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(BaseSinkConfig.FILE_PATH)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down
Loading

0 comments on commit 7b2f538

Please sign in to comment.