Skip to content

Commit

Permalink
feat: add light java sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
denglong committed Nov 29, 2023
1 parent 5e225da commit e346253
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SdkOption {
private int glogLevel = 0;
private String glogDir = "";
private int maxSqlCacheSize = 50;
private Boolean isLight = false;

private void buildBaseOptions(BasicRouterOptions opt) {
opt.setEnable_debug(getEnableDebug());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,28 @@ public class SqlClusterExecutor implements SqlExecutor {
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private SQLRouter sqlRouter;
private DeploymentManager deploymentManager;
private ZKClient zkClient;
private InsertPreparedStatementCache insertCache;

public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlException {
initJavaSdkLibrary(libraryPath);

ZKClient zkClient = null;
if (option.isClusterMode()) {
SQLRouterOptions sqlOpt = option.buildSQLRouterOptions();
this.sqlRouter = sql_router_sdk.NewClusterSQLRouter(sqlOpt);
sqlOpt.delete();
zkClient = new ZKClient(ZKConfig.builder()
.cluster(option.getZkCluster())
.namespace(option.getZkPath())
.sessionTimeout((int)option.getSessionTimeout())
.build());
try {
if (!zkClient.connect()) {
throw new SqlException("zk client connect failed.");
if (!option.getIsLight()) {
zkClient = new ZKClient(ZKConfig.builder()
.cluster(option.getZkCluster())
.namespace(option.getZkPath())
.sessionTimeout((int)option.getSessionTimeout())
.build());
try {
if (!zkClient.connect()) {
throw new SqlException("zk client connect failed.");
}
} catch (Exception e) {
throw new SqlException("init zk client failed. " + e.getMessage());
}
} catch (Exception e) {
throw new SqlException("init zk client failed. " + e.getMessage());
}
} else {
StandaloneOptions sqlOpt = option.buildStandaloneOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
option = new SdkOption();
option.setZkCluster(zkCluster);
option.setZkPath(zkPath);
option.setIsLight(true);
String timeout = options.get("sessionTimeout");
if (timeout != null) {
option.setSessionTimeout(Integer.parseInt(timeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon
SdkOption option = new SdkOption();
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setIsLight(true);
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task
SdkOption option = new SdkOption();
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setIsLight(true);
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReade
val option = new SdkOption
option.setZkCluster(config.zkCluster)
option.setZkPath(config.zkPath)
option.setIsLight(true)
val executor = new SqlClusterExecutor(option)
val dbName: String = config.dbName
val tableName: String = config.tableName
Expand Down

0 comments on commit e346253

Please sign in to comment.