Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Iceberg] Support custom delete sql for sink savemode #8094

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/en/connector-v2/sink/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ libfb303-xxx.jar

## Sink Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|----------------------------------------|---------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| catalog_name | string | yes | default | User-specified catalog name. default is `default` |
| namespace | string | yes | default | The iceberg database name in the backend catalog. default is `default` |
Expand All @@ -76,6 +76,7 @@ libfb303-xxx.jar
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |
| custom_sql | string | no | - | Custom `delete` data sql for data save mode. e.g: `delete from ... where ...` |
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
| iceberg.table.commit-branch | string | no | - | Default branch for commits |

## Task Example
Expand Down
10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>${jsqlparser.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down Expand Up @@ -244,6 +249,11 @@
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>net.sf.jsqlparser</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.net.sf.jsqlparser</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;

import org.apache.iceberg.PartitionField;
Expand All @@ -44,9 +45,13 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Types;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -212,7 +217,39 @@ public boolean isExistsData(TablePath tablePath) {

@Override
public void executeSql(TablePath tablePath, String sql) {
throw new UnsupportedOperationException("Does not support executing custom SQL");
Delete delete;
try {
Statement statement = CCJSqlParserUtil.parse(sql);
delete = (Delete) statement;
} catch (Throwable e) {
throw new IllegalArgumentException(
"Only support sql: delete from ... where ..., Not support: " + sql, e);
}

TablePath targetTablePath = TablePath.of(delete.getTable().getFullyQualifiedName(), false);
if (targetTablePath.getDatabaseName() == null) {
targetTablePath =
TablePath.of(tablePath.getDatabaseName(), targetTablePath.getTableName());
}
if (!targetTablePath.equals(tablePath)) {
log.warn(
"The delete table {} is not equal to the target table {}",
targetTablePath,
tablePath);
}

TableIdentifier icebergTableIdentifier = toIcebergTableIdentifier(targetTablePath);
Table table = catalog.loadTable(icebergTableIdentifier);
Expression expression = ExpressionUtils.convert(delete.getWhere(), table.schema());
catalog.loadTable(icebergTableIdentifier)
.newDelete()
.deleteFromRowFilter(expression)
.commit();
log.info(
"Delete table {} data success, sql [{}] to deleteFromRowFilter: {}",
targetTablePath,
sql,
expression);
}

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public class SinkConfig extends CommonConfig {
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data save mode");

public static final Option<String> DATA_SAVE_MODE_CUSTOM_SQL =
Options.key("custom_sql")
.stringType()
.noDefaultValue()
.withDescription("custom delete data sql for data save mode");

public static final Option<String> TABLES_DEFAULT_COMMIT_BRANCH =
Options.key("iceberg.table.commit-branch")
.stringType()
Expand All @@ -128,6 +134,7 @@ public class SinkConfig extends CommonConfig {
private boolean tableSchemaEvolutionEnabled;
private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
private String dataSaveModeSQL;

public SinkConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
Expand All @@ -140,6 +147,7 @@ public SinkConfig(ReadonlyConfig readonlyConfig) {
this.tableSchemaEvolutionEnabled = readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP);
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
this.dataSaveModeSQL = readonlyConfig.get(DATA_SAVE_MODE_CUSTOM_SQL);
this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
config.getDataSaveMode(),
catalog,
catalogTable,
null));
config.getDataSaveModeSQL()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
Expand Down Expand Up @@ -56,13 +57,19 @@ public OptionRule optionRule() {
SinkConfig.KERBEROS_KEYTAB_PATH,
SinkConfig.KRB5_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.SCHEMA_SAVE_MODE,
SinkConfig.DATA_SAVE_MODE,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
SinkConfig.DATA_SAVE_MODE,
DataSaveMode.CUSTOM_PROCESSING,
SinkConfig.DATA_SAVE_MODE_CUSTOM_SQL)
.build();
}

Expand Down
Loading
Loading