From 857cfd6e0891529804fdec55ab6d50378b87a6ac Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Thu, 30 Nov 2023 18:13:14 +0800 Subject: [PATCH] feat: add insert_memory_limit option --- .../src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java | 2 ++ .../openmldb/spark/write/OpenmldbDataSingleWriter.java | 1 + .../_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java | 2 ++ src/sdk/sql_router.h | 1 + 4 files changed, 6 insertions(+) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index 83dd73cf657..00045237dd8 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -45,6 +45,7 @@ public class SdkOption { private int glogLevel = 0; private String glogDir = ""; private int maxSqlCacheSize = 50; + private int insertMemoryUsageLimit = 0; // [0-100], the default value 0 means unlimited private void buildBaseOptions(BasicRouterOptions opt) { opt.setEnable_debug(getEnableDebug()); @@ -52,6 +53,7 @@ private void buildBaseOptions(BasicRouterOptions opt) { opt.setGlog_level(getGlogLevel()); opt.setGlog_dir(getGlogDir()); opt.setMax_sql_cache_size(getMaxSqlCacheSize()); + opt.setInsert_memory_usage_limit(getInsertMemoryUsageLimit()); } public SQLRouterOptions buildSQLRouterOptions() throws SqlException { diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index fd767d4eebb..56d61f33e66 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -44,6 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); + option.setInsertMemoryUsageLimit(config.memoryLimit); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java index 89c2d801ca5..ca1161ab110 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java @@ -24,6 +24,7 @@ // Must serializable public class OpenmldbWriteConfig implements Serializable { public final String dbName, tableName, zkCluster, zkPath, writerType; + public final int insertMemoryUsageLimit; public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) { this.dbName = dbName; @@ -31,6 +32,7 @@ public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, St this.zkCluster = option.getZkCluster(); this.zkPath = option.getZkPath(); this.writerType = writerType; + this.insertMemoryUsageLimit = option.getInsertMemoryUsageLimit(); // TODO(hw): other configs in SdkOption } } diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index 68186a83b00..85cfa03f492 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -49,6 +49,7 @@ struct BasicRouterOptions { int glog_level = 0; // empty means to stderr std::string glog_dir = ""; + int insert_memory_usage_limit = 0; }; struct SQLRouterOptions : BasicRouterOptions {