Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify partition bounds via jdbc xlang. #34394

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

claudevdm
Copy link
Collaborator

@claudevdm claudevdm commented Mar 23, 2025

This allows bypassing partition bound calculation as an additional step during pipeline execution. Adds more customized support to the xlang transform.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@claudevdm claudevdm requested a review from Abacn March 24, 2025 15:40
@claudevdm claudevdm marked this pull request as ready for review March 24, 2025 15:40
@@ -72,6 +72,8 @@ public Schema configurationSchema() {
// readQuery
.addNullableField("partitionColumn", FieldType.STRING)
.addNullableField("partitions", FieldType.INT16)
.addNullableField("longLowerBound", FieldType.INT64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works for long. Currently there is builtin support for Long and Datetime. I am thinking about if it would benefits from a more flexible parameter design, e.g. pass in string and partition type names as SchemaIO parameters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is simpler to have strict types in the JdbcSchemaIoProvider for partitions, and in python [1] we can change lower_bound to be Union[int, datetime]. Then we just set the relevant type in the crosslang config, the user doesnt need to do anything special.

But I decided not to do this right now because

  1. Right now crosslang only supports long partitions. This is because it uses the default ReadWithPartitions method [2]
  2. I am thinking majority of people are ok with using long partitions since xlang never even supported dates to begin with

Maybe we can add support for date partitions in a followup?

[1]

lower_bound=None,

[2]
return JdbcIO.<T, Long>readWithPartitions(TypeDescriptors.longs());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just support long for now, However this is public API surface and we need to consider extendability. Saying we are going to support other lower/upper bound types in the future, then every time a type added one needs to add a new field to schemaio providers and touch multiple files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping stricter types here simplifies things because read with partitions only supports dates and integers, if we think there is a chance we will add more partition types then a more flexible approach might be worth it? If you feel strongly about this I can change this to the flexible approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I am fine with both. There is a chance introducing new configurations are not needed at all, please see comments #34394 (comment)

@Abacn
Copy link
Contributor

Abacn commented Mar 24, 2025

also if the purpose is

bypassing the requirement to grant database access to machine where pipeline is built/submitted for partition bound inference.

we should be able to change here

JdbcIO.<KV<Long, KV<PartitionColumnT, PartitionColumnT>>>read()

to use explicit schema, if partition type is set at pipeline expansion time (and not need to pass in lower and upper bound)

@claudevdm
Copy link
Collaborator Author

also if the purpose is

bypassing the requirement to grant database access to machine where pipeline is built/submitted for partition bound inference.

we should be able to change here

JdbcIO.<KV<Long, KV<PartitionColumnT, PartitionColumnT>>>read()

to use explicit schema, if partition type is set at pipeline expansion time (and not need to pass in lower and upper bound)

I don't follow. From my experience there are two cases where the database is accessed during expansion time:

  1. Infer schema
  2. Get partition bounds

For ReadWithPartitions, both partition bounds and schema need to be explicitly provided to avoid database queries during expansion time. There is already support for explicitly providing schema

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @liferoad for label python.
R: @kennknowles for label java.
R: @Abacn for label build.
R: @shunping for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor

Abacn commented Mar 24, 2025

there are two cases where the database is accessed during expansion time:

  1. Get partition bounds

This is because Get partition bounds is done by a JdbcIO.read transform (pointed above, JdbcIO.java#L1513), as a result, it tries to infer a schema.

But this is not needed if we already know it would be a long. I mean we can set schema to FieldType.Long here, if we know it's going to be long, thanks to your previous change now we can avoid infer schema for JdbcIO.read transform in general.

then Get partition bounds and JdbcIO.readFromPartition won't require database connection at pipeline submission time

I don't follow. From my experience there are two cases where the database is accessed during expansion time:

I mean, if the goal is to:

  • remove expansion time database connection requirement for xlang jdbcio.readfrompartition

we can just change the JdbcIO.read transform (pointed above, JdbcIO.java#L1513) added by jdbcio.readfrompartition to explicitly set a schema (Long), this is a much smaller change than introducing parameters, and user does not need to set these parameters also

@Abacn
Copy link
Contributor

Abacn commented Mar 24, 2025

we can just change the JdbcIO.read transform (pointed above, JdbcIO.java#L1513) added by jdbcio.readfrompartition to explicitly set a schema (Long), this is a much smaller change than introducing parameters, and user does not need to set these parameters also

sorry, I see, this is a JdbcIO.read instead of JdbcIO.readRows, so it does not need database connection at expansion time. Where is the database connection happens currently?

@claudevdm
Copy link
Collaborator Author

we can just change the JdbcIO.read transform (pointed above, JdbcIO.java#L1513) added by jdbcio.readfrompartition to explicitly set a schema (Long), this is a much smaller change than introducing parameters, and user does not need to set these parameters also

sorry, I see, this is a JdbcIO.read instead of JdbcIO.readRows, so it does not need database connection at expansion time. Where is the database connection happens currently?

Hmm, I actually thought it tries to get the partition bounds during pipeline expand, but looking again I was mistaken :D. Thanks for pointing this out.

So this change isnt really necessary to avoid database access during expansion. But we might as well add it to give the ability to explicitly pass partition bounds to avoid extra database calls during execution.

@Abacn
Copy link
Contributor

Abacn commented Mar 24, 2025

But we might as well add it to give the ability to explicitly pass partition bounds to avoid extra database calls during execution.

For now I am marginally fine with the change (original comments in #34394 (comment)). For decisions of publicly faced API, good to involve additional reviewer to have an opinion?

@claudevdm claudevdm requested a review from damccorm March 24, 2025 19:27
.withFieldValue("password", "")
.withFieldValue("partitionColumn", "id")
.withFieldValue("partitions", (short) 5)
.withFieldValue("longLowerBound", 0L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leave the type out of the field name

These names are user-facing (unless we do work to hide them) in managed IO, YAML, etc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants