-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg Source + S3 Table Sink #86
base: main
Are you sure you want to change the base?
Conversation
…e-for-apache-flink-examples into iceberg
…vice-for-apache-flink-examples into iceberg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments. Mostly changes for clarity.
Please also update the top-level ./java/pom.xml
to point the renamed example and the 2 new examples
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) | ||
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) | ||
|
||
This example demonstrate how to use |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.amazonaws</groupId> | ||
<artifactId>iceberg-datastream-sink</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Artifact should be renamed to iceberg-datastream-source
<version>${iceberg.version}</version> | ||
</dependency> | ||
|
||
<!-- S3 Tables Dependencies --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these dependencies needed in this example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, this must be a copy paste residual, going to go through and reduce all dependencies
|
||
Properties icebergProperties = applicationProperties.get("Iceberg"); | ||
|
||
// TODO: Call Iceberg Data Source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create the method or remove the TODO
import java.util.*; | ||
|
||
/** | ||
* Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain this works with S3 Tables
} | ||
|
||
// Iceberg Flink Sink Builder | ||
public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream<GenericRecord> dataStream, org.apache.avro.Schema avroSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to highlight with comments what is different from a standard Iceberg sink
"PropertyGroupId": "Iceberg", | ||
"PropertyMap": { | ||
|
||
"table.bucket.arn": "REDACTED_ARN", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion: this could be a more evident placeholder. Something like <REDACTED_ARN>
or just <ARN>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done (for this repo), will be a bit more work to do it for all as it checks for a REGEX. Maybe we should make this a pattern going forward? But its a bit tricky to get right.
|
||
|
||
### Known limitations | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, the fact it works with 1.19 and not 1.20 can go here
// Iceberg Flink Sink Builder | ||
public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream<GenericRecord> dataStream, org.apache.avro.Schema avroSchema) { | ||
// Retrieve configuration from application parameters | ||
String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 bucket prefix not defined"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an S3Table ARN, not a bucket prefix. I would rename the variable and fix the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this is an important difference of S3Tables vs Iceberg, and it may generate confusion. IMO it is worth a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment: * This table bucket ARN will be used as the Table Catalog for Iceberg, this is unique compared to the standard Iceberg table.
Purpose of the change
Adding an Iceberg Source + S3 Table Sink. Sorry, they're in one PR, i can split them up if required.
Verifying this change
Please test your changes both running locally, in the IDE, and in Managed Service for Apache Flink. All examples must run
in both environment without code changes.
Describe how you tested your application, show the output of the running application with screenshots.
Application was tested using Datagen, validated using SQL queries to verify data is arriving in Iceberg format.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterward, for convenience.)