Skip to content

Commit

Permalink
[Feature][Iceberg] Support custom delete sql for sink savemode (#8094)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Nov 29, 2024
1 parent 18b2496 commit 29ca928
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docs/en/connector-v2/sink/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ libfb303-xxx.jar

## Sink Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|----------------------------------------|---------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| catalog_name | string | yes | default | User-specified catalog name. default is `default` |
| namespace | string | yes | default | The iceberg database name in the backend catalog. default is `default` |
Expand All @@ -76,6 +76,7 @@ libfb303-xxx.jar
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |
| custom_sql | string | no | - | Custom `delete` data sql for data save mode. e.g: `delete from ... where ...` |
| iceberg.table.commit-branch | string | no | - | Default branch for commits |

## Task Example
Expand Down
10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>${jsqlparser.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down Expand Up @@ -244,6 +249,11 @@
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>net.sf.jsqlparser</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.net.sf.jsqlparser</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;

import org.apache.iceberg.PartitionField;
Expand All @@ -44,9 +45,13 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Types;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -212,7 +217,39 @@ public boolean isExistsData(TablePath tablePath) {

@Override
public void executeSql(TablePath tablePath, String sql) {
throw new UnsupportedOperationException("Does not support executing custom SQL");
Delete delete;
try {
Statement statement = CCJSqlParserUtil.parse(sql);
delete = (Delete) statement;
} catch (Throwable e) {
throw new IllegalArgumentException(
"Only support sql: delete from ... where ..., Not support: " + sql, e);
}

TablePath targetTablePath = TablePath.of(delete.getTable().getFullyQualifiedName(), false);
if (targetTablePath.getDatabaseName() == null) {
targetTablePath =
TablePath.of(tablePath.getDatabaseName(), targetTablePath.getTableName());
}
if (!targetTablePath.equals(tablePath)) {
log.warn(
"The delete table {} is not equal to the target table {}",
targetTablePath,
tablePath);
}

TableIdentifier icebergTableIdentifier = toIcebergTableIdentifier(targetTablePath);
Table table = catalog.loadTable(icebergTableIdentifier);
Expression expression = ExpressionUtils.convert(delete.getWhere(), table.schema());
catalog.loadTable(icebergTableIdentifier)
.newDelete()
.deleteFromRowFilter(expression)
.commit();
log.info(
"Delete table {} data success, sql [{}] to deleteFromRowFilter: {}",
targetTablePath,
sql,
expression);
}

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public class SinkConfig extends CommonConfig {
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data save mode");

public static final Option<String> DATA_SAVE_MODE_CUSTOM_SQL =
Options.key("custom_sql")
.stringType()
.noDefaultValue()
.withDescription("custom delete data sql for data save mode");

public static final Option<String> TABLES_DEFAULT_COMMIT_BRANCH =
Options.key("iceberg.table.commit-branch")
.stringType()
Expand All @@ -128,6 +134,7 @@ public class SinkConfig extends CommonConfig {
private boolean tableSchemaEvolutionEnabled;
private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
private String dataSaveModeSQL;

public SinkConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
Expand All @@ -140,6 +147,7 @@ public SinkConfig(ReadonlyConfig readonlyConfig) {
this.tableSchemaEvolutionEnabled = readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP);
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
this.dataSaveModeSQL = readonlyConfig.get(DATA_SAVE_MODE_CUSTOM_SQL);
this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
config.getDataSaveMode(),
catalog,
catalogTable,
null));
config.getDataSaveModeSQL()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
Expand Down Expand Up @@ -56,13 +57,19 @@ public OptionRule optionRule() {
SinkConfig.KERBEROS_KEYTAB_PATH,
SinkConfig.KRB5_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.SCHEMA_SAVE_MODE,
SinkConfig.DATA_SAVE_MODE,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
SinkConfig.DATA_SAVE_MODE,
DataSaveMode.CUSTOM_PROCESSING,
SinkConfig.DATA_SAVE_MODE_CUSTOM_SQL)
.build();
}

Expand Down
Loading

0 comments on commit 29ca928

Please sign in to comment.