Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg Source + S3 Table Sink #86

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

jeremyber-aws
Copy link
Contributor

Purpose of the change

Adding an Iceberg Source + S3 Table Sink. Sorry, they're in one PR, i can split them up if required.

Verifying this change

Please test your changes both running locally, in the IDE, and in Managed Service for Apache Flink. All examples must run
in both environment without code changes.

Describe how you tested your application, show the output of the running application with screenshots.

Application was tested using Datagen, validated using SQL queries to verify data is arriving in Iceberg format.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterward, for convenience.)

  • Completely new example
  • Updated an existing example to newer Flink version or dependencies versions
  • Improved an existing example
  • Modified the runtime configuration of an existing example (i.e. added/removed/modified any runtime properties)
  • Modified the expected input or output of an existing example (e.g. modified the source or sink, modified the record schema)

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments. Mostly changes for clarity.
Please also update the top-level ./java/pom.xml to point the renamed example and the 2 new examples

* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)

This example demonstrate how to use

This comment was marked as resolved.

This comment was marked as resolved.

<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>iceberg-datastream-sink</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Artifact should be renamed to iceberg-datastream-source

<version>${iceberg.version}</version>
</dependency>

<!-- S3 Tables Dependencies -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these dependencies needed in this example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, this must be a copy paste residual, going to go through and reduce all dependencies


Properties icebergProperties = applicationProperties.get("Iceberg");

// TODO: Call Iceberg Data Source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create the method or remove the TODO

import java.util.*;

/**
* Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain this works with S3 Tables

}

// Iceberg Flink Sink Builder
public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream<GenericRecord> dataStream, org.apache.avro.Schema avroSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to highlight with comments what is different from a standard Iceberg sink

"PropertyGroupId": "Iceberg",
"PropertyMap": {

"table.bucket.arn": "REDACTED_ARN",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion: this could be a more evident placeholder. Something like <REDACTED_ARN> or just <ARN>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (for this repo), will be a bit more work to do it for all as it checks for a REGEX. Maybe we should make this a pattern going forward? But its a bit tricky to get right.



### Known limitations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, the fact it works with 1.19 and not 1.20 can go here

// Iceberg Flink Sink Builder
public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream<GenericRecord> dataStream, org.apache.avro.Schema avroSchema) {
// Retrieve configuration from application parameters
String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 bucket prefix not defined");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an S3Table ARN, not a bucket prefix. I would rename the variable and fix the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this is an important difference of S3Tables vs Iceberg, and it may generate confusion. IMO it is worth a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment: * This table bucket ARN will be used as the Table Catalog for Iceberg, this is unique compared to the standard Iceberg table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants