Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36061][iceberg] Add Iceberg Sink. #3904

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Feb 5, 2025

Add Iceberg DataSink.
Notice:
This PR does not include the logic of automatically compacting small files.

Coauthor with @czy006 that based on #3877.

@lvyanquan lvyanquan marked this pull request as draft February 5, 2025 01:40
@github-actions github-actions bot added docs Improvements or additions to documentation mysql-cdc-connector e2e-tests labels Feb 7, 2025
@lvyanquan lvyanquan force-pushed the FLINK-36061 branch 2 times, most recently from 1d17ae3 to f0d2899 Compare February 8, 2025 11:58
@lvyanquan lvyanquan marked this pull request as ready for review February 10, 2025 01:45
Copy link
Contributor

@SML0127 SML0127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive and long-awaited work! I left some minor comments and questions. If possible, I'd like to contribute as well.

Comment on lines +96 to +130
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOGGER.info(String.format("Nothing to commit to table %s, skipping", table.name()));
} else {
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
append.commit();
} else {
RowDelta delta = table.newRowDelta();
dataFiles.forEach(delta::addRows);
deleteFiles.forEach(delta::addDeletes);
delta.commit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg supports empty commit to revmoe old tmp files and clean some data from flink state (apache/iceberg#6630). I'd like to know if a slimilar issue exists.

Comment on lines +51 to +63
if (writeResult.dataFiles() != null) {
for (DataFile dataFile : writeResult.dataFiles()) {
addCount += dataFile.recordCount();
}
}
long deleteCount = 0;
if (writeResult.deleteFiles() != null) {
for (DeleteFile dataFile : writeResult.deleteFiles()) {
deleteCount += dataFile.recordCount();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could also provide some metrics (e.g. numRecordsInDataFiles, numDataFiles) too.

Comment on lines +52 to +54
public static final ConfigOption<String> PARTITION_KEY =
key("partition.key")
.stringType()
.defaultValue("")
.withDescription(
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
+ "Tables are separated by ';', and partition keys are separated by ','. "
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there is no issue since only identity is supported. However, I'm concerned that it may become more complex when other transforms, such as day, hour, bucket(n), truncate(w) are supported in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we should support more partition transform function.
However, a thorny issue is how to configure this transformation, as it takes effect on fields, and we usually have many tables and fields that are too complex to configure one by one. We need a more elegant configuration scheme.

import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;

/** Config options for {@link IcebergDataSink}. */
public class IcebergDataSinkOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding options for name of catalog and namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quiet understand what kind of options, could you explain more about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you think about receiving catalog-name as a separate option.
And I understand that the current code separates the namespace and table name based on dot This method is also possible, but I would like to ask for opinions on providing the namespace name as a separate option.

Suggested change
public class IcebergDataSinkOptions {
public class IcebergDataSinkOptions {
public static final ConfigOption<String> CATALOG_NAME =
key("catalog.name")
.stringType()
.noDefaultValue()
.withDescription("The name of iceberg catalog to use");
public static final ConfigOption<String> NAMESPACE_NAME =
key("namespace.name")
.stringType()
.noDefaultValue()
.withDescription("The name of iceberg namespace to use");

TableIdentifier.java

  public static TableIdentifier of(Namespace namespace, String name) {
    return new TableIdentifier(namespace, name);
  }

  public static TableIdentifier parse(String identifier) {
    Preconditions.checkArgument(identifier != null, "Cannot parse table identifier: null");
    Iterable<String> parts = DOT.split(identifier);
    return TableIdentifier.of(Iterables.toArray(parts, String.class));
  }

@lvyanquan lvyanquan force-pushed the FLINK-36061 branch 2 times, most recently from 350e437 to 7c5033c Compare March 18, 2025 12:51
@github-actions github-actions bot removed docs Improvements or additions to documentation e2e-tests labels Mar 18, 2025
@github-actions github-actions bot added docs Improvements or additions to documentation e2e-tests labels Mar 19, 2025
@lvyanquan lvyanquan marked this pull request as draft March 24, 2025 06:40
@lvyanquan lvyanquan force-pushed the FLINK-36061 branch 3 times, most recently from d654b7a to acb0002 Compare March 26, 2025 01:25
@github-actions github-actions bot removed the docs Improvements or additions to documentation label Mar 26, 2025
@lvyanquan lvyanquan marked this pull request as ready for review March 26, 2025 01:26
@github-actions github-actions bot added the docs Improvements or additions to documentation label Mar 26, 2025
@lvyanquan
Copy link
Contributor Author

Hi @leonardBang, could you help to review this?

@leonardBang leonardBang self-requested a review March 26, 2025 01:50
@yuxiqian
Copy link
Contributor

Hi @lvyanquan, thanks for your contribution! Could you please rebase this PR with latest master when available?

Code style verifier has been updated to enforce JUnit 5 + AssertJ framework and these classes might need to be migrated:

  • JUnit 4 style test annotations should be changed to JUnit 5 equivalents

    • org.junit.Test => org.junit.jupiter.api.Test
    • @Before, @BeforeClass => @BeforeEach, @BeforeAll
    • @After, @AfterClass => @AfterEach, @AfterAll
  • JUnit Assertions / Hamcrest Assertions are not allowed, including:

    • org.junit.Assert
    • org.junit.jupiter.api.Assertions
    • org.hamcrest.*

org.assertj.core.api.Assertions should be used instead.

Run mvn verify -DskipTests locally to check if all these requirements have been satisfied.

+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: evolve\n"
+ " parallelism: 1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test with larger parallelism.

if (commitTimes >= compactionOptions.getCommitInterval()
&& !compactedTables.contains(tableId)) {
if (throwable != null) {
throw new RuntimeException(throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to move this if statement to the start of this method processElement out of if (element.getValue() instanceof CommittableWithLineage)?


import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;

/** Util class for {@link IcebergDataSink}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Util class for {@link IcebergDataSink}. */
/** Util class for types in {@link IcebergDataSink}. */

public class IcebergTypeUtils {

/** Convert column from CDC framework to Iceberg framework. */
public static Types.NestedField convertCDCColumnToIcebergField(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static Types.NestedField convertCDCColumnToIcebergField(
public static Types.NestedField convertCdcColumnToIcebergField(

case TIMESTAMP_WITH_TIME_ZONE:
return Types.TimestampType.withZone();
default:
throw new IllegalArgumentException("Illegal type: " + type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("Illegal type: " + type);
throw new IllegalArgumentException("Unsupported cdc type in iceberg: " + type);

};
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right that TIMESTAMP_WITH_LOCAL_TIME_ZONE and TIMESTAMP_WITH_TIME_ZONE have the same code?

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
return options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return

Suggested change
return options;
return new HashSet<>();

this.catalogOptions = catalogOptions;
this.tableOptions = new HashMap<>();
this.partitionMaps = new HashMap<>();
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
this(catalogOptions, new HashMap<>(), new HashMap<>());

try {
UpdateSchema updateSchema = table.updateSchema();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
Column addColumn = columnWithPosition.getAddColumn();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Column addColumn = columnWithPosition.getAddColumn();
Column addColumn = columnWithPosition.getAddColumn();
String columnName = addColumn.getName();
LogicalType logicalType =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());

Use columnName and logicalType in following code.

public void close() throws Exception {
if (schemaMap != null) {
schemaMap.clear();
schemaMap = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will IcebergWriter be reused?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs Improvements or additions to documentation e2e-tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants