Skip to content

Commit

Permalink
[Improve][Connector-V2] Save data savemode operation when schema save…
Browse files Browse the repository at this point in the history
…mode created new table (#8178)
  • Loading branch information
zhangshenghang authored Nov 29, 2024
1 parent a145abc commit c0f3781
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DefaultSaveModeHandler implements SaveModeHandler {
@Nonnull public TablePath tablePath;
@Nullable public CatalogTable catalogTable;
@Nullable public String customSql;
private boolean isNewTableCreated;

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
Expand All @@ -56,7 +57,18 @@ public DefaultSaveModeHandler(
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
customSql);
customSql,
false);
}

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
TablePath tablePath,
CatalogTable catalogTable,
String customSql) {
this(schemaSaveMode, dataSaveMode, catalog, tablePath, catalogTable, customSql, false);
}

@Override
Expand Down Expand Up @@ -123,7 +135,7 @@ protected void errorWhenSchemaNotExist() {
}

protected void keepSchemaDropData() {
if (tableExists()) {
if (tableExists() && !isNewTableCreated) {
truncateTable();
}
}
Expand Down Expand Up @@ -187,6 +199,7 @@ protected void createTablePreCheck() {
protected void createTable() {
createTablePreCheck();
catalog.createTable(tablePath, catalogTable, true);
isNewTableCreated = true;
}

protected void truncateTable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.InMemoryCatalog;
import org.apache.seatunnel.api.table.catalog.InMemoryCatalogFactory;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class DefaultSaveModeHandlerTest {

private SeaTunnelRowType rowType;
private InMemoryCatalogFactory catalogFactory;

@BeforeEach
public void setup() {
String[] fieldNames = new String[] {"id", "name", "description", "weight"};
SeaTunnelDataType<?>[] dataTypes =
new SeaTunnelDataType[] {
BasicType.LONG_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
};
rowType = new SeaTunnelRowType(fieldNames, dataTypes);
catalogFactory = new InMemoryCatalogFactory();
}

@Test
public void shouldTruncateExistingTable() {
// SchemaSaveMode is CREATE_SCHEMA_WHEN_NOT_EXIST and DataSaveMode is DROP_DATA and table
// exist, truncateTable needs to be executed
CatalogTable catalogTable = createCatalogTable("table1");
Catalog catalog = catalogFactory.createCatalog("test", null);
DefaultSaveModeHandler handler =
createHandler(
SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
DataSaveMode.DROP_DATA,
catalog,
catalogTable);

handler.handleSchemaSaveMode();
handler.handleDataSaveMode();

InMemoryCatalog inMemoryCatalog = (InMemoryCatalog) catalog;
assertTrue(inMemoryCatalog.isRunTruncateTable(), "Should truncate data for existing table");
}

@Test
public void shouldNotTruncateNewlyCreatedTable() {
// SchemaSaveMode is CREATE_SCHEMA_WHEN_NOT_EXIST and DataSaveMode is DROP_DATA and table
// not exist, truncateTable no needs to be executed
CatalogTable catalogTable = createCatalogTable("notExistsTable");
Catalog catalog = catalogFactory.createCatalog("test", null);
DefaultSaveModeHandler handler =
createHandler(
SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
DataSaveMode.DROP_DATA,
catalog,
catalogTable);

handler.handleSchemaSaveMode();
handler.handleDataSaveMode();

InMemoryCatalog inMemoryCatalog = (InMemoryCatalog) catalog;
assertFalse(
inMemoryCatalog.isRunTruncateTable(),
"Should not truncate data for newly created table");
}

@Test
public void shouldNotTruncateRecreatedTable() {
// SchemaSaveMode is RECREATE_SCHEMA and DataSaveMode is DROP_DATA , truncateTable no needs
// to be executed
CatalogTable catalogTable = createCatalogTable("notExistsTable");
Catalog catalog = catalogFactory.createCatalog("test", null);
DefaultSaveModeHandler handler =
createHandler(
SchemaSaveMode.RECREATE_SCHEMA,
DataSaveMode.DROP_DATA,
catalog,
catalogTable);

handler.handleSchemaSaveMode();
handler.handleDataSaveMode();

InMemoryCatalog inMemoryCatalog = (InMemoryCatalog) catalog;
assertFalse(
inMemoryCatalog.isRunTruncateTable(),
"Should not truncate data for recreated table");
}

private CatalogTable createCatalogTable(String tableName) {
return CatalogTableUtil.getCatalogTable("", "st", "public", tableName, rowType);
}

private DefaultSaveModeHandler createHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable) {
return new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, catalogTable, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.lang3.tuple.Pair;

import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@ public class InMemoryCatalog implements Catalog {
private final Map<String, List<CatalogTable>> catalogTables;
private static final String DEFAULT_DATABASE = "default";
private static final String UNSUPPORTED_DATABASE = "unsupported";
@Getter private boolean isRunTruncateTable = false;

InMemoryCatalog(String catalogName, ReadonlyConfig options) {
this.name = catalogName;
Expand Down Expand Up @@ -161,6 +163,12 @@ public String getDefaultDatabase() throws CatalogException {
return DEFAULT_DATABASE;
}

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
isRunTruncateTable = true;
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return catalogTables.containsKey(databaseName);
Expand Down

0 comments on commit c0f3781

Please sign in to comment.