Skip to content

Commit 09e496e

Browse files
committed
Initial.
1 parent 3595a33 commit 09e496e

File tree

5 files changed

+70
-3
lines changed

5 files changed

+70
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 5
3+
"modification": 6
44
}

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java

+14
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public Schema configurationSchema() {
7272
// readQuery
7373
.addNullableField("partitionColumn", FieldType.STRING)
7474
.addNullableField("partitions", FieldType.INT16)
75+
.addNullableField("longLowerBound", FieldType.INT64) // For numeric bounds
76+
.addNullableField("longUpperBound", FieldType.INT64) // For numeric bounds
7577
.addNullableField("maxConnections", FieldType.INT16)
7678
.addNullableField("driverJars", FieldType.STRING)
7779
.addNullableField("writeBatchSize", FieldType.INT64)
@@ -139,6 +141,18 @@ public PCollection<Row> expand(PBegin input) {
139141
readRows = readRows.withNumPartitions(partitions);
140142
}
141143

144+
if (config.getSchema().hasField("longLowerBound")
145+
&& config.getSchema().hasField("longUpperBound")
146+
&& config.getInt64("longLowerBound") != null
147+
&& config.getInt64("longUpperBound") != null) {
148+
readRows =
149+
((JdbcIO.ReadWithPartitions<Row, Long>) readRows)
150+
.withLowerBound(
151+
Preconditions.checkStateNotNull(config.getInt64("longLowerBound")))
152+
.withUpperBound(
153+
Preconditions.checkStateNotNull(config.getInt64("longUpperBound")));
154+
}
155+
142156
@Nullable Short fetchSize = config.getInt16("fetchSize");
143157
if (fetchSize != null) {
144158
readRows = readRows.withFetchSize(fetchSize);

sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,31 @@ public void testPartitionedReadWithExplicitSchema() {
131131
pipeline.run();
132132
}
133133

134+
@Test
135+
public void testPartitionedReadWithExplicitLongBounds() {
136+
JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider();
137+
138+
Row config =
139+
Row.withSchema(provider.configurationSchema())
140+
.withFieldValue("driverClassName", DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
141+
.withFieldValue("jdbcUrl", DATA_SOURCE_CONFIGURATION.getUrl().get())
142+
.withFieldValue("username", "")
143+
.withFieldValue("password", "")
144+
.withFieldValue("partitionColumn", "id")
145+
.withFieldValue("partitions", (short) 5)
146+
.withFieldValue("longLowerBound", 0L)
147+
.withFieldValue("longUpperBound", (long) EXPECTED_ROW_COUNT / 2)
148+
.build();
149+
150+
JdbcSchemaIOProvider.JdbcSchemaIO schemaIO =
151+
provider.from(READ_TABLE_NAME, config, Schema.builder().build());
152+
153+
PCollection<Row> output = pipeline.apply(schemaIO.buildReader());
154+
Long expected = Long.valueOf(EXPECTED_ROW_COUNT / 2 + 1);
155+
PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
156+
pipeline.run();
157+
}
158+
134159
@Test
135160
public void testReadWithExplicitSchema() {
136161
JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider();

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

+20
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,26 @@ def test_xlang_jdbc_write_read(self, database):
288288

289289
assert_that(result, equal_to(expected_rows))
290290

291+
# Try the same read using the partitioned reader code path.
292+
# with explicit partition bounds.
293+
with TestPipeline() as p:
294+
p.not_use_test_runner_api = True
295+
result = (
296+
p
297+
| 'Partitioned read from jdbc' >> ReadFromJdbc(
298+
table_name=table_name,
299+
partition_column='f_id',
300+
partitions=3,
301+
lower_bound=0,
302+
upper_bound=2,
303+
driver_class_name=config['driver_class_name'],
304+
jdbc_url=config['jdbc_url'],
305+
username=config['username'],
306+
password=config['password'],
307+
classpath=config['classpath']))
308+
309+
assert_that(result, equal_to(expected_rows[:3]))
310+
291311
@parameterized.expand(['postgres', 'mysql'])
292312
def test_xlang_jdbc_read_with_explicit_schema(self, database):
293313
if self.containers[database] is None:

sdks/python/apache_beam/io/jdbc.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ def default_io_expansion_service(classpath=None):
131131
('autosharding', typing.Optional[bool]),
132132
('partition_column', typing.Optional[str]),
133133
('partitions', typing.Optional[np.int16]),
134+
('long_lower_bound', typing.Optional[np.int64]),
135+
('long_upper_bound', typing.Optional[np.int64]),
134136
('max_connections', typing.Optional[np.int16]),
135137
('driver_jars', typing.Optional[str]),
136138
('write_batch_size', typing.Optional[np.int64])],
@@ -250,7 +252,9 @@ def __init__(
250252
max_connections=max_connections,
251253
driver_jars=driver_jars,
252254
partitions=None,
253-
partition_column=None)),
255+
partition_column=None,
256+
long_lower_bound=None,
257+
long_upper_bound=None)),
254258
dataSchema=None),
255259
),
256260
expansion_service or default_io_expansion_service(classpath),
@@ -301,6 +305,8 @@ def __init__(
301305
fetch_size=None,
302306
partition_column=None,
303307
partitions=None,
308+
lower_bound=None,
309+
upper_bound=None,
304310
connection_properties=None,
305311
connection_init_sqls=None,
306312
max_connections=None,
@@ -381,7 +387,9 @@ def __init__(
381387
max_connections=max_connections,
382388
driver_jars=driver_jars,
383389
partition_column=partition_column,
384-
partitions=partitions)),
390+
partitions=partitions,
391+
long_lower_bound=lower_bound,
392+
long_upper_bound=upper_bound)),
385393
dataSchema=dataSchema),
386394
),
387395
expansion_service or default_io_expansion_service(classpath),

0 commit comments

Comments
 (0)