Skip to content

Commit

Permalink
Flink: FLIP-27 IcebergSource builder missed a couple of configs compa…
Browse files Browse the repository at this point in the history
…red to old FlinkSource: expose locality and plan parallelism (apache#10957)
  • Loading branch information
stevenzwu authored Aug 19, 2024
1 parent ed07fd1 commit 43bbf08
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);
contextBuilder.exposeLocality(
SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
contextBuilder.planParallelism(
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);
contextBuilder.exposeLocality(
SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
contextBuilder.planParallelism(
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);
contextBuilder.exposeLocality(
SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
contextBuilder.planParallelism(
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
Expand Down

0 comments on commit 43bbf08

Please sign in to comment.