-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to specify partition bounds via jdbc xlang. #34394
base: master
Are you sure you want to change the base?
Conversation
b1d62d4
to
09e496e
Compare
09e496e
to
2537278
Compare
@@ -72,6 +72,8 @@ public Schema configurationSchema() { | |||
// readQuery | |||
.addNullableField("partitionColumn", FieldType.STRING) | |||
.addNullableField("partitions", FieldType.INT16) | |||
.addNullableField("longLowerBound", FieldType.INT64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only works for long. Currently there is builtin support for Long and Datetime. I am thinking about if it would benefits from a more flexible parameter design, e.g. pass in string and partition type names as SchemaIO parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is simpler to have strict types in the JdbcSchemaIoProvider for partitions, and in python [1] we can change lower_bound to be Union[int, datetime]. Then we just set the relevant type in the crosslang config, the user doesnt need to do anything special.
But I decided not to do this right now because
- Right now crosslang only supports long partitions. This is because it uses the default ReadWithPartitions method [2]
- I am thinking majority of people are ok with using long partitions since xlang never even supported dates to begin with
Maybe we can add support for date partitions in a followup?
[1]
beam/sdks/python/apache_beam/io/jdbc.py
Line 308 in 8b02feb
lower_bound=None, |
[2]
return JdbcIO.<T, Long>readWithPartitions(TypeDescriptors.longs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just support long for now, However this is public API surface and we need to consider extendability. Saying we are going to support other lower/upper bound types in the future, then every time a type added one needs to add a new field to schemaio providers and touch multiple files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keeping stricter types here simplifies things because read with partitions only supports dates and integers, if we think there is a chance we will add more partition types then a more flexible approach might be worth it? If you feel strongly about this I can change this to the flexible approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I am fine with both. There is a chance introducing new configurations are not needed at all, please see comments #34394 (comment)
also if the purpose is
we should be able to change here
to use explicit schema, if partition type is set at pipeline expansion time (and not need to pass in lower and upper bound) |
I don't follow. From my experience there are two cases where the database is accessed during expansion time:
For ReadWithPartitions, both partition bounds and schema need to be explicitly provided to avoid database queries during expansion time. There is already support for explicitly providing schema
|
Assigning reviewers. If you would like to opt out of this review, comment R: @liferoad for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
This is because Get partition bounds is done by a JdbcIO.read transform (pointed above, JdbcIO.java#L1513), as a result, it tries to infer a schema. But this is not needed if we already know it would be a long. I mean we can set schema to FieldType.Long here, if we know it's going to be long, thanks to your previous change now we can avoid infer schema for JdbcIO.read transform in general. then Get partition bounds and JdbcIO.readFromPartition won't require database connection at pipeline submission time
I mean, if the goal is to:
we can just change the JdbcIO.read transform (pointed above, JdbcIO.java#L1513) added by jdbcio.readfrompartition to explicitly set a schema (Long), this is a much smaller change than introducing parameters, and user does not need to set these parameters also |
sorry, I see, this is a JdbcIO.read instead of JdbcIO.readRows, so it does not need database connection at expansion time. Where is the database connection happens currently? |
Hmm, I actually thought it tries to get the partition bounds during pipeline expand, but looking again I was mistaken :D. Thanks for pointing this out. So this change isnt really necessary to avoid database access during expansion. But we might as well add it to give the ability to explicitly pass partition bounds to avoid extra database calls during execution. |
For now I am marginally fine with the change (original comments in #34394 (comment)). For decisions of publicly faced API, good to involve additional reviewer to have an opinion? |
.withFieldValue("password", "") | ||
.withFieldValue("partitionColumn", "id") | ||
.withFieldValue("partitions", (short) 5) | ||
.withFieldValue("longLowerBound", 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: leave the type out of the field name
These names are user-facing (unless we do work to hide them) in managed IO, YAML, etc
This allows bypassing partition bound calculation as an additional step during pipeline execution. Adds more customized support to the xlang transform.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.