-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36061][iceberg] Add Iceberg Sink. #3904
base: master
Are you sure you want to change the base?
Conversation
8e9f227
to
e6a6d12
Compare
1d17ae3
to
f0d2899
Compare
875a336
to
5586437
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive and long-awaited work! I left some minor comments and questions. If possible, I'd like to contribute as well.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
Show resolved
Hide resolved
...eberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
Show resolved
Hide resolved
...eberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
Outdated
Show resolved
Hide resolved
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { | ||
LOGGER.info(String.format("Nothing to commit to table %s, skipping", table.name())); | ||
} else { | ||
if (deleteFiles.isEmpty()) { | ||
AppendFiles append = table.newAppend(); | ||
dataFiles.forEach(append::appendFile); | ||
append.commit(); | ||
} else { | ||
RowDelta delta = table.newRowDelta(); | ||
dataFiles.forEach(delta::addRows); | ||
deleteFiles.forEach(delta::addDeletes); | ||
delta.commit(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg supports empty commit to revmoe old tmp files and clean some data from flink state (apache/iceberg#6630). I'd like to know if a slimilar issue exists.
if (writeResult.dataFiles() != null) { | ||
for (DataFile dataFile : writeResult.dataFiles()) { | ||
addCount += dataFile.recordCount(); | ||
} | ||
} | ||
long deleteCount = 0; | ||
if (writeResult.deleteFiles() != null) { | ||
for (DeleteFile dataFile : writeResult.deleteFiles()) { | ||
deleteCount += dataFile.recordCount(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we could also provide some metrics (e.g. numRecordsInDataFiles, numDataFiles) too.
public static final ConfigOption<String> PARTITION_KEY = | ||
key("partition.key") | ||
.stringType() | ||
.defaultValue("") | ||
.withDescription( | ||
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. " | ||
+ "Tables are separated by ';', and partition keys are separated by ','. " | ||
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, there is no issue since only identity
is supported. However, I'm concerned that it may become more complex when other transforms, such as day
, hour
, bucket(n)
, truncate(w)
are supported in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we should support more partition transform function.
However, a thorny issue is how to configure this transformation, as it takes effect on fields, and we usually have many tables and fields that are too complex to configure one by one. We need a more elegant configuration scheme.
import static org.apache.flink.cdc.common.configuration.ConfigOptions.key; | ||
|
||
/** Config options for {@link IcebergDataSink}. */ | ||
public class IcebergDataSinkOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding options for name of catalog and namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quiet understand what kind of options, could you explain more about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what you think about receiving catalog-name
as a separate option.
And I understand that the current code separates the namespace and table name based on dot
This method is also possible, but I would like to ask for opinions on providing the namespace name as a separate option.
public class IcebergDataSinkOptions { | |
public class IcebergDataSinkOptions { | |
public static final ConfigOption<String> CATALOG_NAME = | |
key("catalog.name") | |
.stringType() | |
.noDefaultValue() | |
.withDescription("The name of iceberg catalog to use"); | |
public static final ConfigOption<String> NAMESPACE_NAME = | |
key("namespace.name") | |
.stringType() | |
.noDefaultValue() | |
.withDescription("The name of iceberg namespace to use"); |
public static TableIdentifier of(Namespace namespace, String name) {
return new TableIdentifier(namespace, name);
}
public static TableIdentifier parse(String identifier) {
Preconditions.checkArgument(identifier != null, "Cannot parse table identifier: null");
Iterable<String> parts = DOT.split(identifier);
return TableIdentifier.of(Iterables.toArray(parts, String.class));
}
350e437
to
7c5033c
Compare
d654b7a
to
acb0002
Compare
Hi @leonardBang, could you help to review this? |
Hi @lvyanquan, thanks for your contribution! Could you please rebase this PR with latest Code style verifier has been updated to enforce JUnit 5 + AssertJ framework and these classes might need to be migrated:
Run |
+ "\n" | ||
+ "pipeline:\n" | ||
+ " schema.change.behavior: evolve\n" | ||
+ " parallelism: 1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test with larger parallelism.
if (commitTimes >= compactionOptions.getCommitInterval() | ||
&& !compactedTables.contains(tableId)) { | ||
if (throwable != null) { | ||
throw new RuntimeException(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to move this if statement to the start of this method processElement
out of if (element.getValue() instanceof CommittableWithLineage)
?
|
||
import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; | ||
|
||
/** Util class for {@link IcebergDataSink}. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** Util class for {@link IcebergDataSink}. */ | |
/** Util class for types in {@link IcebergDataSink}. */ |
public class IcebergTypeUtils { | ||
|
||
/** Convert column from CDC framework to Iceberg framework. */ | ||
public static Types.NestedField convertCDCColumnToIcebergField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static Types.NestedField convertCDCColumnToIcebergField( | |
public static Types.NestedField convertCdcColumnToIcebergField( |
case TIMESTAMP_WITH_TIME_ZONE: | ||
return Types.TimestampType.withZone(); | ||
default: | ||
throw new IllegalArgumentException("Illegal type: " + type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("Illegal type: " + type); | |
throw new IllegalArgumentException("Unsupported cdc type in iceberg: " + type); |
}; | ||
break; | ||
case TIMESTAMP_WITH_LOCAL_TIME_ZONE: | ||
case TIMESTAMP_WITH_TIME_ZONE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it right that TIMESTAMP_WITH_LOCAL_TIME_ZONE
and TIMESTAMP_WITH_TIME_ZONE
have the same code?
@Override | ||
public Set<ConfigOption<?>> requiredOptions() { | ||
Set<ConfigOption<?>> options = new HashSet<>(); | ||
return options; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
return options; | |
return new HashSet<>(); |
this.catalogOptions = catalogOptions; | ||
this.tableOptions = new HashMap<>(); | ||
this.partitionMaps = new HashMap<>(); | ||
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); | |
this(catalogOptions, new HashMap<>(), new HashMap<>()); |
try { | ||
UpdateSchema updateSchema = table.updateSchema(); | ||
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { | ||
Column addColumn = columnWithPosition.getAddColumn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column addColumn = columnWithPosition.getAddColumn(); | |
Column addColumn = columnWithPosition.getAddColumn(); | |
String columnName = addColumn.getName(); | |
LogicalType logicalType = | |
FlinkSchemaUtil.convert( | |
DataTypeUtils.toFlinkDataType(addColumn.getType()) | |
.getLogicalType()); |
Use columnName
and logicalType
in following code.
public void close() throws Exception { | ||
if (schemaMap != null) { | ||
schemaMap.clear(); | ||
schemaMap = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will IcebergWriter
be reused?
Add Iceberg DataSink.
Notice:
This PR does not include the logic of automatically compacting small files.
Coauthor with @czy006 that based on #3877.