diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 5955f88..713a5a0 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -43,6 +43,7 @@ public class QueryServiceExecutor extends BaseYdbExecutor { private final Duration sessionTimeout; private final QueryClient queryClient; + private final boolean useStreamResultSet; private int transactionLevel; private boolean isReadOnly; @@ -56,6 +57,8 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo super(ctx); this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); this.queryClient = ctx.getQueryClient(); + this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets(); + this.transactionLevel = transactionLevel; this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; this.isAutoCommit = autoCommit; @@ -227,6 +230,21 @@ public YdbQueryResult executeDataQuery( tx = createNewQuerySession(validator).createNewTransaction(txMode); } + if (useStreamResultSet) { + String msg = "STREAM_QUERY >>\n" + yql; + StreamQueryResult lazy = validator.call(msg, () -> { + QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings); + StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + return result.execute(stream, () -> { + if (!tx.isActive()) { + cleanTx(); + } + }); + }); + + return updateCurrentResult(lazy); + } + try { QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, () -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings)) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java index 4411fd2..c8778ed 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java @@ -38,7 +38,7 @@ public class YdbConfig { + "{@code 0} disables the cache.", 256 ); static final YdbProperty USE_QUERY_SERVICE = YdbProperty.bool("useQueryService", - "Use QueryService intead of TableService", false + "Use QueryService instead of TableService", false ); static final YdbProperty FULLSCAN_DETECTOR_ENABLED = YdbProperty.bool( "jdbcFullScanDetector", "Enable analizator for collecting query stats", false @@ -167,6 +167,7 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException { YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties), YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties), + YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties), YdbOperationProperties.JOIN_DURATION.toInfo(properties), YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties), YdbOperationProperties.SCAN_QUERY_TIMEOUT.toInfo(properties), diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 41f939d..74c7bd6 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -53,6 +53,10 @@ public class YdbOperationProperties { FakeTxMode.ERROR ); + static final YdbProperty USE_STREAM_RESULT_SETS = YdbProperty.bool("useStreamResultSets", + "Use stream implementation of ResultSet", false + ); + private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? private final YdbValue joinDuration; @@ -68,6 +72,8 @@ public class YdbOperationProperties { private final YdbValue schemeQueryTxMode; private final YdbValue bulkQueryTxMode; + private final YdbValue useStreamResultSets; + public YdbOperationProperties(YdbConfig config) throws SQLException { Properties props = config.getProperties(); @@ -83,6 +89,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException { this.scanQueryTxMode = SCAN_QUERY_TX_MODE.readValue(props); this.schemeQueryTxMode = SCHEME_QUERY_TX_MODE.readValue(props); this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props); + + this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props); } public Duration getJoinDuration() { @@ -129,6 +137,10 @@ public int getTransactionLevel() { return transactionLevel.getValue(); } + public boolean getUseStreamResultSets() { + return useStreamResultSets.getValue(); + } + public int getMaxRows() { return MAX_ROWS; } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java index 7982858..e1070f3 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java @@ -313,4 +313,132 @@ public void forceScanAndBulkTest() throws SQLException { } } } + + @Test + public void streamResultsTest() throws SQLException { + try (Connection conn = DriverManager.getConnection(jdbcURL + .withArg("useQueryService", "true") + .withArg("useStreamResultSets", "true") + .build() + )) { + try { + conn.createStatement().execute(DROP_TABLE); + } catch (SQLException e) { + // ignore + } + + conn.createStatement().execute(CREATE_TABLE); + + LocalDate ld = LocalDate.of(2017, 12, 3); + String prefix = "text-value-"; + int idx = 0; + + // single batch upsert + try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.executeUpdate(); + } + + // single batch insert + try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.executeUpdate(); + } + + // stream read + try (Statement st = conn.createStatement()) { + int readed = 0; + try (ResultSet rs = st.executeQuery(SELECT_ALL)) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("id")); + Assertions.assertEquals(prefix + readed, rs.getString("value")); + Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date")); + } + } + Assertions.assertEquals(2, readed); + } + + // batch upsert + try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) { + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + + // single row upsert + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.execute(); + + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + } + + // batch inserts + try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) { + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + + // single row insert + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.execute(); + + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + } + + // read all + try (Statement st = conn.createStatement()) { + int readed = 0; + try (ResultSet rs = st.executeQuery(SELECT_ALL)) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("id")); + Assertions.assertEquals(prefix + readed, rs.getString("value")); + Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date")); + } + } + Assertions.assertEquals(8004, readed); + } + + // single update + try (PreparedStatement ps = conn.prepareStatement(UPDATE_ROW)) { + ps.setString(1, "updated-value"); + ps.setInt(2, 1); + ps.executeUpdate(); + } + + // single delete + try (PreparedStatement ps = conn.prepareStatement(DELETE_ROW)) { + ps.setInt(1, 2); + ps.executeUpdate(); + } + } + } } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java index 995d62a..bc1eb65 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java @@ -879,15 +879,15 @@ private String createPayload(Random rnd, int length) { @Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD) public void testBigBulkAndScan() throws SQLException { String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?"); - String scanSelectAll = QUERIES.scanSelectSQL(); + String selectAll = QUERIES.selectSQL(); String selectOne = QUERIES.selectAllByKey("?"); Random rnd = new Random(0x234567); int payloadLength = 1000; - try { + try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) { // BULK UPSERT - try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) { + try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) { for (int idx = 1; idx <= 10000; idx++) { ps.setInt(1, idx); String payload = createPayload(rnd, payloadLength); @@ -901,7 +901,7 @@ public void testBigBulkAndScan() throws SQLException { } // SCAN all table - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(selectAll)) { int readed = 0; Assertions.assertTrue(ps.execute()); try (ResultSet rs = ps.getResultSet()) { @@ -915,7 +915,7 @@ public void testBigBulkAndScan() throws SQLException { } // Canceled scan - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(selectAll)) { Assertions.assertTrue(ps.execute()); ps.getResultSet().next(); ps.getResultSet().close(); @@ -933,7 +933,7 @@ public void testBigBulkAndScan() throws SQLException { } // Scan was cancelled, but connection still work - try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) { + try (PreparedStatement ps = conn.prepareStatement(selectOne)) { ps.setInt(1, 1234); Assertions.assertTrue(ps.execute()); diff --git a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java index ae52b9a..31f1388 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java @@ -318,6 +318,7 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter new DriverPropertyInfo("sessionMaxIdleTime", ""), new DriverPropertyInfo("sessionPoolSizeMin", ""), new DriverPropertyInfo("sessionPoolSizeMax", ""), + new DriverPropertyInfo("useStreamResultSets", "false"), new DriverPropertyInfo("joinDuration", "5m"), new DriverPropertyInfo("queryTimeout", "0s"), new DriverPropertyInfo("scanQueryTimeout", "5m"), @@ -358,6 +359,7 @@ static DriverPropertyInfo[] customizedPropertyInfo() { new DriverPropertyInfo("sessionMaxIdleTime", "5m"), new DriverPropertyInfo("sessionPoolSizeMin", "3"), new DriverPropertyInfo("sessionPoolSizeMax", "4"), + new DriverPropertyInfo("useStreamResultSets", "true"), new DriverPropertyInfo("joinDuration", "6m"), new DriverPropertyInfo("queryTimeout", "2m"), new DriverPropertyInfo("scanQueryTimeout", "3m"),