From e346253ae4b0bf93080d61cb340e50844f695d06 Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 29 Nov 2023 18:38:04 +0800 Subject: [PATCH 1/7] feat: add light java sdk --- .../_4paradigm/openmldb/sdk/SdkOption.java | 1 + .../openmldb/sdk/impl/SqlClusterExecutor.java | 25 ++++++++++--------- .../openmldb/spark/OpenmldbSource.java | 1 + .../spark/write/OpenmldbDataSingleWriter.java | 1 + .../spark/write/OpenmldbDataWriter.java | 1 + .../spark/read/OpenmldbPartitionReader.scala | 1 + 6 files changed, 18 insertions(+), 12 deletions(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index 83dd73cf657..c5defdc692c 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -45,6 +45,7 @@ public class SdkOption { private int glogLevel = 0; private String glogDir = ""; private int maxSqlCacheSize = 50; + private Boolean isLight = false; private void buildBaseOptions(BasicRouterOptions opt) { opt.setEnable_debug(getEnableDebug()); diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index 9505cd6aba9..a9562a2358f 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -62,27 +62,28 @@ public class SqlClusterExecutor implements SqlExecutor { private static final AtomicBoolean initialized = new AtomicBoolean(false); private SQLRouter sqlRouter; private DeploymentManager deploymentManager; - private ZKClient zkClient; private InsertPreparedStatementCache insertCache; public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlException { initJavaSdkLibrary(libraryPath); - + ZKClient zkClient = null; if (option.isClusterMode()) { SQLRouterOptions sqlOpt = option.buildSQLRouterOptions(); this.sqlRouter = sql_router_sdk.NewClusterSQLRouter(sqlOpt); sqlOpt.delete(); - zkClient = new ZKClient(ZKConfig.builder() - .cluster(option.getZkCluster()) - .namespace(option.getZkPath()) - .sessionTimeout((int)option.getSessionTimeout()) - .build()); - try { - if (!zkClient.connect()) { - throw new SqlException("zk client connect failed."); + if (!option.getIsLight()) { + zkClient = new ZKClient(ZKConfig.builder() + .cluster(option.getZkCluster()) + .namespace(option.getZkPath()) + .sessionTimeout((int)option.getSessionTimeout()) + .build()); + try { + if (!zkClient.connect()) { + throw new SqlException("zk client connect failed."); + } + } catch (Exception e) { + throw new SqlException("init zk client failed. " + e.getMessage()); } - } catch (Exception e) { - throw new SqlException("init zk client failed. " + e.getMessage()); } } else { StandaloneOptions sqlOpt = option.buildStandaloneOptions(); diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java index 5595e313af5..cdd623967a7 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java @@ -53,6 +53,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { option = new SdkOption(); option.setZkCluster(zkCluster); option.setZkPath(zkPath); + option.setIsLight(true); String timeout = options.get("sessionTimeout"); if (timeout != null) { option.setSessionTimeout(Integer.parseInt(timeout)); diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index fd767d4eebb..b08c2011058 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -44,6 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); + option.setIsLight(true); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java index 3cb7632ae0b..f1ef50554ad 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java @@ -44,6 +44,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); + option.setIsLight(true); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala b/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala index 4e6267c3139..db2b41baeb2 100644 --- a/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala +++ b/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala @@ -13,6 +13,7 @@ class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReade val option = new SdkOption option.setZkCluster(config.zkCluster) option.setZkPath(config.zkPath) + option.setIsLight(true) val executor = new SqlClusterExecutor(option) val dbName: String = config.dbName val tableName: String = config.tableName From 8ad865633fd95a9e8336a4398e0603fdd8a16987 Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 29 Nov 2023 18:48:42 +0800 Subject: [PATCH 2/7] test: add test --- .../com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index 68dc237d1cf..4496d928982 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -45,6 +45,7 @@ public class SQLRouterSmokeTest { public static SqlExecutor clusterExecutor; + public static SqlExecutor lightClusterExecutor; public static SqlExecutor standaloneExecutor; static { @@ -54,6 +55,8 @@ public class SQLRouterSmokeTest { option.setZkCluster(TestConfig.ZK_CLUSTER); option.setSessionTimeout(200000); clusterExecutor = new SqlClusterExecutor(option); + option.setIsLight(true); + lightClusterExecutor = new SqlClusterExecutor(option); java.sql.Statement state = clusterExecutor.getStatement(); state.execute("SET @@execute_mode='online';"); state.close(); @@ -82,7 +85,7 @@ void testMoreOptions() throws Exception { @DataProvider(name = "executor") public Object[] executor() { - return new Object[] { clusterExecutor, standaloneExecutor }; + return new Object[] { clusterExecutor, lightClusterExecutor, standaloneExecutor }; } @Test(dataProvider = "executor") From 8f22ce8307106b0b85a3bfe6e305816dbb07ffee Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 29 Nov 2023 18:52:12 +0800 Subject: [PATCH 3/7] refact: type --- .../src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index c5defdc692c..eca5289bf32 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -45,7 +45,7 @@ public class SdkOption { private int glogLevel = 0; private String glogDir = ""; private int maxSqlCacheSize = 50; - private Boolean isLight = false; + private boolean isLight = false; private void buildBaseOptions(BasicRouterOptions opt) { opt.setEnable_debug(getEnableDebug()); From 83b93d2781e80875a54524daab067448cb7d85e5 Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 29 Nov 2023 18:59:28 +0800 Subject: [PATCH 4/7] fix: fix light --- .../com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index a9562a2358f..cb52da5c480 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -71,7 +71,7 @@ public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlExcept SQLRouterOptions sqlOpt = option.buildSQLRouterOptions(); this.sqlRouter = sql_router_sdk.NewClusterSQLRouter(sqlOpt); sqlOpt.delete(); - if (!option.getIsLight()) { + if (!option.isLight()) { zkClient = new ZKClient(ZKConfig.builder() .cluster(option.getZkCluster()) .namespace(option.getZkPath()) From d404b2bf6398843d5b2cd2c6877f7055bf779cba Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 29 Nov 2023 20:01:18 +0800 Subject: [PATCH 5/7] fix: fix compile --- .../java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java | 2 +- .../main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java | 2 +- .../openmldb/spark/write/OpenmldbDataSingleWriter.java | 2 +- .../com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java | 2 +- .../openmldb/spark/read/OpenmldbPartitionReader.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index 4496d928982..ecfaba9e430 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -55,7 +55,7 @@ public class SQLRouterSmokeTest { option.setZkCluster(TestConfig.ZK_CLUSTER); option.setSessionTimeout(200000); clusterExecutor = new SqlClusterExecutor(option); - option.setIsLight(true); + option.setLight(true); lightClusterExecutor = new SqlClusterExecutor(option); java.sql.Statement state = clusterExecutor.getStatement(); state.execute("SET @@execute_mode='online';"); diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java index cdd623967a7..978c3cca694 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java @@ -53,7 +53,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { option = new SdkOption(); option.setZkCluster(zkCluster); option.setZkPath(zkPath); - option.setIsLight(true); + option.setLight(true); String timeout = options.get("sessionTimeout"); if (timeout != null) { option.setSessionTimeout(Integer.parseInt(timeout)); diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index b08c2011058..2885aaba70e 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -44,7 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); - option.setIsLight(true); + option.setLight(true); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java index f1ef50554ad..5da75e99348 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java @@ -44,7 +44,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); - option.setIsLight(true); + option.setLight(true); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala b/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala index db2b41baeb2..d8eeb89e7ab 100644 --- a/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala +++ b/java/openmldb-spark-connector/src/main/scala/com/_4paradigm/openmldb/spark/read/OpenmldbPartitionReader.scala @@ -13,7 +13,7 @@ class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReade val option = new SdkOption option.setZkCluster(config.zkCluster) option.setZkPath(config.zkPath) - option.setIsLight(true) + option.setLight(true) val executor = new SqlClusterExecutor(option) val dbName: String = config.dbName val tableName: String = config.tableName From eb32555f4b36ae52e0281b455929c60eef14cfab Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Thu, 30 Nov 2023 09:35:34 +0800 Subject: [PATCH 6/7] test: fix online mode --- .../openmldb/jdbc/SQLRouterSmokeTest.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index ecfaba9e430..bc92f20d3f5 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -55,11 +55,10 @@ public class SQLRouterSmokeTest { option.setZkCluster(TestConfig.ZK_CLUSTER); option.setSessionTimeout(200000); clusterExecutor = new SqlClusterExecutor(option); + setOnlineMode(clusterExecutor); option.setLight(true); lightClusterExecutor = new SqlClusterExecutor(option); - java.sql.Statement state = clusterExecutor.getStatement(); - state.execute("SET @@execute_mode='online';"); - state.close(); + setOnlineMode(lightClusterExecutor); // create standalone router SdkOption standaloneOption = new SdkOption(); standaloneOption.setHost(TestConfig.HOST); @@ -72,6 +71,16 @@ public class SQLRouterSmokeTest { } } + static void setOnlineMode(SqlExecutor executor) { + java.sql.Statement state = executor.getStatement(); + try { + state.execute("SET @@execute_mode='online';"); + state.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + @Test void testMoreOptions() throws Exception { SdkOption option = new SdkOption(); @@ -131,7 +140,7 @@ public void testSmoke(SqlExecutor router) { // select String select1 = "select * from tsql1010;"; - SQLResultSet rs1 = (SQLResultSet) router .executeSQL(dbname, select1); + SQLResultSet rs1 = (SQLResultSet) router.executeSQL(dbname, select1); Assert.assertEquals(2, rs1.GetInternalSchema().getColumnList().size()); Assert.assertEquals(Types.BIGINT, rs1.GetInternalSchema().getColumnType(0)); From 7ad6ea1bd1da664b220c8d7f94c4328bd8c91313 Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Mon, 4 Dec 2023 11:29:51 +0800 Subject: [PATCH 7/7] fix: fix comment --- .../java/com/_4paradigm/openmldb/sdk/SqlException.java | 8 ++++++++ .../_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlException.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlException.java index edb30c275de..f97cc1acff9 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlException.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlException.java @@ -20,4 +20,12 @@ public class SqlException extends Exception { public SqlException(String message) { super(message); } + + public SqlException(String message, Throwable cause) { + super(message, cause); + } + + public SqlException(Throwable cause) { + super(cause); + } } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index cb52da5c480..3a88fb9489e 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -82,7 +82,7 @@ public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlExcept throw new SqlException("zk client connect failed."); } } catch (Exception e) { - throw new SqlException("init zk client failed. " + e.getMessage()); + throw new SqlException("init zk client failed", e); } } } else {