Skip to content

Commit

Permalink
Added option to use streamable result sets
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Oct 11, 2024
1 parent ed8a3b9 commit f8b825d
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 7 deletions.
18 changes: 18 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class QueryServiceExecutor extends BaseYdbExecutor {
private final Duration sessionTimeout;
private final QueryClient queryClient;
private final boolean useStreamResultSet;

private int transactionLevel;
private boolean isReadOnly;
Expand All @@ -56,6 +57,8 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
super(ctx);
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.queryClient = ctx.getQueryClient();
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();

this.transactionLevel = transactionLevel;
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
this.isAutoCommit = autoCommit;
Expand Down Expand Up @@ -227,6 +230,21 @@ public YdbQueryResult executeDataQuery(
tx = createNewQuerySession(validator).createNewTransaction(txMode);
}

if (useStreamResultSet) {
String msg = "STREAM_QUERY >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream, () -> {
if (!tx.isActive()) {
cleanTx();
}
});
});

return updateCurrentResult(lazy);
}

try {
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
() -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings))
Expand Down
3 changes: 2 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class YdbConfig {
+ "{@code 0} disables the cache.", 256
);
static final YdbProperty<Boolean> USE_QUERY_SERVICE = YdbProperty.bool("useQueryService",
"Use QueryService intead of TableService", false
"Use QueryService instead of TableService", false
);
static final YdbProperty<Boolean> FULLSCAN_DETECTOR_ENABLED = YdbProperty.bool(
"jdbcFullScanDetector", "Enable analizator for collecting query stats", false
Expand Down Expand Up @@ -167,6 +167,7 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException {
YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties),
YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties),

YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties),
YdbOperationProperties.JOIN_DURATION.toInfo(properties),
YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties),
YdbOperationProperties.SCAN_QUERY_TIMEOUT.toInfo(properties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class YdbOperationProperties {
FakeTxMode.ERROR
);

static final YdbProperty<Boolean> USE_STREAM_RESULT_SETS = YdbProperty.bool("useStreamResultSets",
"Use stream implementation of ResultSet", false
);

private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection?

private final YdbValue<Duration> joinDuration;
Expand All @@ -68,6 +72,8 @@ public class YdbOperationProperties {
private final YdbValue<FakeTxMode> schemeQueryTxMode;
private final YdbValue<FakeTxMode> bulkQueryTxMode;

private final YdbValue<Boolean> useStreamResultSets;

public YdbOperationProperties(YdbConfig config) throws SQLException {
Properties props = config.getProperties();

Expand All @@ -83,6 +89,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException {
this.scanQueryTxMode = SCAN_QUERY_TX_MODE.readValue(props);
this.schemeQueryTxMode = SCHEME_QUERY_TX_MODE.readValue(props);
this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props);

this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props);
}

public Duration getJoinDuration() {
Expand Down Expand Up @@ -129,6 +137,10 @@ public int getTransactionLevel() {
return transactionLevel.getValue();
}

public boolean getUseStreamResultSets() {
return useStreamResultSets.getValue();
}

public int getMaxRows() {
return MAX_ROWS;
}
Expand Down
128 changes: 128 additions & 0 deletions jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,132 @@ public void forceScanAndBulkTest() throws SQLException {
}
}
}

@Test
public void streamResultsTest() throws SQLException {
try (Connection conn = DriverManager.getConnection(jdbcURL
.withArg("useQueryService", "true")
.withArg("useStreamResultSets", "true")
.build()
)) {
try {
conn.createStatement().execute(DROP_TABLE);
} catch (SQLException e) {
// ignore
}

conn.createStatement().execute(CREATE_TABLE);

LocalDate ld = LocalDate.of(2017, 12, 3);
String prefix = "text-value-";
int idx = 0;

// single batch upsert
try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.executeUpdate();
}

// single batch insert
try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.executeUpdate();
}

// stream read
try (Statement st = conn.createStatement()) {
int readed = 0;
try (ResultSet rs = st.executeQuery(SELECT_ALL)) {
while (rs.next()) {
readed++;
Assertions.assertEquals(readed, rs.getInt("id"));
Assertions.assertEquals(prefix + readed, rs.getString("value"));
Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date"));
}
}
Assertions.assertEquals(2, readed);
}

// batch upsert
try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) {
for (int j = 0; j < 2000; j++) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.addBatch();
}
ps.executeBatch();

// single row upsert
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.execute();

for (int j = 0; j < 2000; j++) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.addBatch();
}
ps.executeBatch();
}

// batch inserts
try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) {
for (int j = 0; j < 2000; j++) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.addBatch();
}
ps.executeBatch();

// single row insert
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.execute();

for (int j = 0; j < 2000; j++) {
ps.setInt(1, ++idx);
ps.setString(2, prefix + idx);
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
ps.addBatch();
}
ps.executeBatch();
}

// read all
try (Statement st = conn.createStatement()) {
int readed = 0;
try (ResultSet rs = st.executeQuery(SELECT_ALL)) {
while (rs.next()) {
readed++;
Assertions.assertEquals(readed, rs.getInt("id"));
Assertions.assertEquals(prefix + readed, rs.getString("value"));
Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date"));
}
}
Assertions.assertEquals(8004, readed);
}

// single update
try (PreparedStatement ps = conn.prepareStatement(UPDATE_ROW)) {
ps.setString(1, "updated-value");
ps.setInt(2, 1);
ps.executeUpdate();
}

// single delete
try (PreparedStatement ps = conn.prepareStatement(DELETE_ROW)) {
ps.setInt(1, 2);
ps.executeUpdate();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,15 @@ private String createPayload(Random rnd, int length) {
@Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD)
public void testBigBulkAndScan() throws SQLException {
String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?");
String scanSelectAll = QUERIES.scanSelectSQL();
String selectAll = QUERIES.selectSQL();
String selectOne = QUERIES.selectAllByKey("?");

Random rnd = new Random(0x234567);
int payloadLength = 1000;

try {
try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) {
// BULK UPSERT
try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) {
try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) {
for (int idx = 1; idx <= 10000; idx++) {
ps.setInt(1, idx);
String payload = createPayload(rnd, payloadLength);
Expand All @@ -901,7 +901,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// SCAN all table
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
try (PreparedStatement ps = conn.prepareStatement(selectAll)) {
int readed = 0;
Assertions.assertTrue(ps.execute());
try (ResultSet rs = ps.getResultSet()) {
Expand All @@ -915,7 +915,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// Canceled scan
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
try (PreparedStatement ps = conn.prepareStatement(selectAll)) {
Assertions.assertTrue(ps.execute());
ps.getResultSet().next();
ps.getResultSet().close();
Expand All @@ -933,7 +933,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// Scan was cancelled, but connection still work
try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) {
try (PreparedStatement ps = conn.prepareStatement(selectOne)) {
ps.setInt(1, 1234);

Assertions.assertTrue(ps.execute());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter
new DriverPropertyInfo("sessionMaxIdleTime", ""),
new DriverPropertyInfo("sessionPoolSizeMin", ""),
new DriverPropertyInfo("sessionPoolSizeMax", ""),
new DriverPropertyInfo("useStreamResultSets", "false"),
new DriverPropertyInfo("joinDuration", "5m"),
new DriverPropertyInfo("queryTimeout", "0s"),
new DriverPropertyInfo("scanQueryTimeout", "5m"),
Expand Down Expand Up @@ -358,6 +359,7 @@ static DriverPropertyInfo[] customizedPropertyInfo() {
new DriverPropertyInfo("sessionMaxIdleTime", "5m"),
new DriverPropertyInfo("sessionPoolSizeMin", "3"),
new DriverPropertyInfo("sessionPoolSizeMax", "4"),
new DriverPropertyInfo("useStreamResultSets", "true"),
new DriverPropertyInfo("joinDuration", "6m"),
new DriverPropertyInfo("queryTimeout", "2m"),
new DriverPropertyInfo("scanQueryTimeout", "3m"),
Expand Down

0 comments on commit f8b825d

Please sign in to comment.