diff --git a/docs/en/deploy/conf.md b/docs/en/deploy/conf.md index 11667427247..138a414fa3d 100644 --- a/docs/en/deploy/conf.md +++ b/docs/en/deploy/conf.md @@ -9,6 +9,8 @@ # If you are deploying the standalone version, you do not need to configure zk_cluster and zk_root_path, just comment these two configurations. Deploying the cluster version needs to configure these two items, and the two configurations of all nodes in a cluster must be consistent #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# set the username and password of zookeeper if authentication is enabled +#--zk_cert=user:passwd # The address of the tablet needs to be specified in the standalone version, and this configuration can be ignored in the cluster version --tablet=127.0.0.1:9921 # Configure log directory @@ -76,6 +78,8 @@ # If you start the cluster version, you need to specify the address of zk and the node path of the cluster in zk #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# set the username and password of zookeeper if authentication is enabled +#--zk_cert=user:passwd # Configure the thread pool size, it is recommended to be consistent with the number of CPU cores --thread_pool_size=24 @@ -218,6 +222,8 @@ # If the deployed openmldb is a cluster version, you need to specify the zk address and the cluster zk node directory #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# set the username and password of zookeeper if authentication is enabled +#--zk_cert=user:passwd # configure log path --openmldb_log_dir=./logs @@ -249,6 +255,7 @@ zookeeper.connection_timeout=5000 zookeeper.max_retries=10 zookeeper.base_sleep_time=1000 zookeeper.max_connect_waitTime=30000 +#zookeeper.cert=user:passwd # Spark Config spark.home= diff --git a/docs/zh/deploy/conf.md b/docs/zh/deploy/conf.md index ef05f0c8dc9..de538720e5d 100644 --- a/docs/zh/deploy/conf.md +++ b/docs/zh/deploy/conf.md @@ -9,6 +9,8 @@ # 如果是部署单机版不需要配置zk_cluster和zk_root_path,把这俩配置注释即可. 部署集群版需要配置这两项,一个集群中所有节点的这两个配置必须保持一致 #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# 配置zk认证的用户名和密码, 用冒号分割 +#--zk_cert=user:passwd # 单机版需要指定tablet的地址, 集群版此配置可忽略 --tablet=127.0.0.1:9921 # 配置log目录 @@ -76,6 +78,8 @@ # 如果启动集群版需要指定zk的地址和集群在zk的节点路径 #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# 配置zk认证的用户名和密码, 用冒号分割 +#--zk_cert=user:passwd # 配置线程池大小,建议和cpu核数一致 --thread_pool_size=24 @@ -222,6 +226,8 @@ # 如果部署的openmldb是集群版,需要指定zk地址和集群zk节点目录 #--zk_cluster=127.0.0.1:7181 #--zk_root_path=/openmldb_cluster +# 配置zk认证的用户名和密码, 用冒号分割 +#--zk_cert=user:passwd # 配置日志路径 --openmldb_log_dir=./logs @@ -254,6 +260,7 @@ zookeeper.connection_timeout=5000 zookeeper.max_retries=10 zookeeper.base_sleep_time=1000 zookeeper.max_connect_waitTime=30000 +#zookeeper.cert=user:passwd # Spark Config spark.home= diff --git a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKClient.java b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKClient.java index 256174c6573..85a1cf0422d 100644 --- a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKClient.java +++ b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKClient.java @@ -20,8 +20,11 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import java.util.concurrent.TimeUnit; import java.util.List; @@ -46,12 +49,26 @@ public CuratorFramework getClient() { public boolean connect() throws InterruptedException { log.info("ZKClient connect with config: {}", config); RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTime(), config.getMaxRetries()); - CuratorFramework client = CuratorFrameworkFactory.builder() + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(config.getCluster()) .sessionTimeoutMs(config.getSessionTimeout()) .connectionTimeoutMs(config.getConnectionTimeout()) - .retryPolicy(retryPolicy) - .build(); + .retryPolicy(retryPolicy); + if (!config.getCert().isEmpty()) { + builder.authorization("digest", config.getCert().getBytes()) + .aclProvider(new ACLProvider() { + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(String s) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + CuratorFramework client = builder.build(); client.start(); if (!client.blockUntilConnected(config.getMaxConnectWaitTime(), TimeUnit.MILLISECONDS)) { return false; diff --git a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKConfig.java b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKConfig.java index e215533a483..f0721a2f256 100644 --- a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKConfig.java +++ b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/zk/ZKConfig.java @@ -32,5 +32,7 @@ public class ZKConfig { private int baseSleepTime = 1000; @Builder.Default private int maxConnectWaitTime = 30000; + @Builder.Default + private String cert = ""; } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index 830f6d1f097..83dd73cf657 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -33,6 +33,7 @@ public class SdkOption { private String sparkConfPath = ""; private int zkLogLevel = 3; private String zkLogFile = ""; + private String zkCert = ""; // options for standalone mode private String host = ""; @@ -70,6 +71,7 @@ public SQLRouterOptions buildSQLRouterOptions() throws SqlException { copt.setSpark_conf_path(getSparkConfPath()); copt.setZk_log_level(getZkLogLevel()); copt.setZk_log_file(getZkLogFile()); + copt.setZk_cert(getZkCert()); // base buildBaseOptions(copt); diff --git a/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolConfig.java b/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolConfig.java index 26680f85c17..4fdb22834db 100644 --- a/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolConfig.java +++ b/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolConfig.java @@ -37,6 +37,7 @@ public class SyncToolConfig { // public static int CHANNEL_KEEP_ALIVE_TIME; public static String ZK_CLUSTER; public static String ZK_ROOT_PATH; + public static String ZK_CERT; public static String SYNC_TASK_PROGRESS_PATH; public static String HADOOP_CONF_DIR; @@ -86,6 +87,7 @@ private static void parseFromProperties(Properties prop) { if (ZK_ROOT_PATH.isEmpty()) { throw new RuntimeException("zookeeper.root_path should not be empty"); } + ZK_CERT = prop.getProperty("zookeeper.cert", ""); HADOOP_CONF_DIR = prop.getProperty("hadoop.conf.dir", ""); if (HADOOP_CONF_DIR.isEmpty()) { diff --git a/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolImpl.java b/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolImpl.java index f63ff2ae406..0e98cffa6f3 100644 --- a/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolImpl.java +++ b/java/openmldb-synctool/src/main/java/com/_4paradigm/openmldb/synctool/SyncToolImpl.java @@ -85,11 +85,13 @@ public SyncToolImpl(String endpoint) throws SqlException, InterruptedException { this.zkClient = new ZKClient(ZKConfig.builder() .cluster(SyncToolConfig.ZK_CLUSTER) .namespace(SyncToolConfig.ZK_ROOT_PATH) + .cert(SyncToolConfig.ZK_CERT) .build()); Preconditions.checkState(zkClient.connect(), "zk connect failed"); SdkOption option = new SdkOption(); option.setZkCluster(SyncToolConfig.ZK_CLUSTER); option.setZkPath(SyncToolConfig.ZK_ROOT_PATH); + option.setZkCert(SyncToolConfig.ZK_CERT); this.router = new SqlClusterExecutor(option); this.zkCollectorPath = SyncToolConfig.ZK_ROOT_PATH + "/sync_tool/collector"; diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/client/TaskManagerClient.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/client/TaskManagerClient.java index ad4bc157b6e..309154233f8 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/client/TaskManagerClient.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/client/TaskManagerClient.java @@ -30,9 +30,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.ArrayList; import java.util.HashMap; @@ -59,16 +62,34 @@ public TaskManagerClient(String endpoint) { } public TaskManagerClient(String zkCluster, String zkPath) throws Exception { + this(zkCluster, zkPath, ""); + } + + public TaskManagerClient(String zkCluster, String zkPath, String zkCert) throws Exception { if (zkCluster == null || zkPath == null) { logger.info("Zookeeper address is wrong, please check the configuration"); } String masterZnode = zkPath + "/taskmanager/leader"; - zkClient = CuratorFrameworkFactory.builder() + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkCluster) .sessionTimeoutMs(10000) - .retryPolicy(new ExponentialBackoffRetry(1000, 10)) - .build(); + .retryPolicy(new ExponentialBackoffRetry(1000, 10)); + if (!zkCert.isEmpty()) { + builder.authorization("digest", zkCert.getBytes()) + .aclProvider(new ACLProvider() { + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(String s) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + zkClient = builder.build(); zkClient.start(); Stat stat = zkClient.checkExists().forPath(masterZnode); if (stat != null) { // The original master exists and is directly connected to it. diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java index 76642ff17d6..784756ba726 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java @@ -101,6 +101,10 @@ public static String getZkRootPath() { return getString("zookeeper.root_path"); } + public static String getZkCert() { + return props.getProperty("zookeeper.cert", ""); + } + public static int getZkConnectionTimeout() { return getInt("zookeeper.connection_timeout"); } diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java index 6fd43d4200c..695338925d8 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java @@ -80,6 +80,7 @@ private void initExternalFunction() throws InterruptedException { .connectionTimeout(TaskManagerConfig.getZkConnectionTimeout()) .maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime()) .maxRetries(TaskManagerConfig.getZkMaxRetries()) + .cert(TaskManagerConfig.getZkCert()) .build()); zkClient.connect(); diff --git a/python/openmldb_sdk/openmldb/sdk/sdk.py b/python/openmldb_sdk/openmldb/sdk/sdk.py index bc8454039b4..e079f77c5d3 100644 --- a/python/openmldb_sdk/openmldb/sdk/sdk.py +++ b/python/openmldb_sdk/openmldb/sdk/sdk.py @@ -52,6 +52,8 @@ def init(self): options.zk_log_level = int(self.options_map['zkLogLevel']) if 'zkLogFile' in self.options_map: options.zk_log_file = self.options_map['zkLogFile'] + if 'zkCert' in self.options_map: + options.zk_cert = self.options_map['zkCert'] else: options = sql_router_sdk.StandaloneOptions() # use host diff --git a/release/conf/apiserver.flags.template b/release/conf/apiserver.flags.template index 539bcc8e4a4..5429b305c3a 100644 --- a/release/conf/apiserver.flags.template +++ b/release/conf/apiserver.flags.template @@ -3,6 +3,7 @@ --role=apiserver --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb +#--zk_cert=user:passwd --openmldb_log_dir=./logs --log_level=info diff --git a/release/conf/nameserver.flags.template b/release/conf/nameserver.flags.template index 445833d194a..b738503bfcc 100644 --- a/release/conf/nameserver.flags.template +++ b/release/conf/nameserver.flags.template @@ -3,6 +3,7 @@ --role=nameserver --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb +#--zk_cert=user:passwd --openmldb_log_dir=./logs --log_level=info diff --git a/release/conf/tablet.flags.template b/release/conf/tablet.flags.template index 3d126d74123..29e0bd7d374 100644 --- a/release/conf/tablet.flags.template +++ b/release/conf/tablet.flags.template @@ -6,6 +6,7 @@ --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb +#--zk_cert=user:passwd # thread_pool_size建议和cpu核数一致 --thread_pool_size=24 diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index d132190f588..8c07cb252da 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -62,6 +62,8 @@ DECLARE_string(nameserver); DECLARE_int32(port); DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(thread_pool_size); DECLARE_int32(put_concurrency_limit); DECLARE_int32(get_concurrency_limit); @@ -3653,7 +3655,7 @@ void StartNsClient() { std::shared_ptr<::openmldb::zk::ZkClient> zk_client; if (!FLAGS_zk_cluster.empty()) { zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "", - FLAGS_zk_session_timeout, "", FLAGS_zk_root_path); + FLAGS_zk_session_timeout, "", FLAGS_zk_root_path, FLAGS_zk_auth_schema, FLAGS_zk_cert); if (!zk_client->Init()) { std::cout << "zk client init failed" << std::endl; return; @@ -3876,6 +3878,8 @@ void StartAPIServer() { cluster_options.zk_cluster = FLAGS_zk_cluster; cluster_options.zk_path = FLAGS_zk_root_path; cluster_options.zk_session_timeout = FLAGS_zk_session_timeout; + cluster_options.zk_auth_schema = FLAGS_zk_auth_schema; + cluster_options.zk_cert = FLAGS_zk_cert; if (!api_service->Init(cluster_options)) { PDLOG(WARNING, "Fail to init"); exit(1); diff --git a/src/cmd/sql_cmd.h b/src/cmd/sql_cmd.h index 2d941c65a35..6b8eae72afb 100644 --- a/src/cmd/sql_cmd.h +++ b/src/cmd/sql_cmd.h @@ -41,6 +41,8 @@ DEFINE_string(spark_conf, "", "The config file of Spark job"); // cluster mode DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(zk_session_timeout); DECLARE_uint32(zk_log_level); DECLARE_string(zk_log_file); @@ -267,6 +269,8 @@ bool InitClusterSDK() { copt.zk_session_timeout = FLAGS_zk_session_timeout; copt.zk_log_level = FLAGS_zk_log_level; copt.zk_log_file = FLAGS_zk_log_file; + copt.zk_auth_schema = FLAGS_zk_auth_schema; + copt.zk_cert = FLAGS_zk_cert; cs = new ::openmldb::sdk::ClusterSDK(copt); if (!cs->Init()) { diff --git a/src/datacollector/data_collector.cc b/src/datacollector/data_collector.cc index 8cf02a3ab2b..cb1a8f254e2 100644 --- a/src/datacollector/data_collector.cc +++ b/src/datacollector/data_collector.cc @@ -33,6 +33,8 @@ DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(thread_pool_size); DECLARE_int32(zk_session_timeout); DECLARE_int32(zk_keep_alive_check_interval); @@ -179,7 +181,8 @@ bool DataCollectorImpl::Init(const std::string& endpoint) { } bool DataCollectorImpl::Init(const std::string& zk_cluster, const std::string& zk_path, const std::string& endpoint) { zk_client_ = std::make_shared(zk_cluster, FLAGS_zk_session_timeout, endpoint, zk_path, - zk_path + kDataCollectorRegisterPath); + zk_path + kDataCollectorRegisterPath, + FLAGS_zk_auth_schema, FLAGS_zk_cert); if (!zk_client_->Init()) { LOG(WARNING) << "fail to init zk client"; return false; diff --git a/src/flags.cc b/src/flags.cc index bed34c0150d..42e085781eb 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -30,6 +30,8 @@ DEFINE_uint32(tablet_heartbeat_timeout, 5 * 60 * 1000, "config the heartbeat of DEFINE_uint32(tablet_offline_check_interval, 1000, "config the check interval of tablet offline. unit is milliseconds"); DEFINE_string(zk_cluster, "", "config the zookeeper cluster eg ip:2181,ip2:2181,ip3:2181"); DEFINE_string(zk_root_path, "/openmldb", "config the root path of zookeeper"); +DEFINE_string(zk_auth_schema, "digest", "config the id of authentication schema"); +DEFINE_string(zk_cert, "", "config the application credentials"); DEFINE_string(tablet, "", "config the endpoint of tablet"); DEFINE_string(nameserver, "", "config the endpoint of nameserver"); DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check. unit is milliseconds"); diff --git a/src/nameserver/cluster_info.cc b/src/nameserver/cluster_info.cc index de30fc8d18f..ec685ce8b3f 100644 --- a/src/nameserver/cluster_info.cc +++ b/src/nameserver/cluster_info.cc @@ -94,7 +94,8 @@ void ClusterInfo::UpdateNSClient(const std::vector& children) { int ClusterInfo::Init(std::string& msg) { zk_client_ = std::make_shared<::openmldb::zk::ZkClient>(cluster_add_.zk_endpoints(), FLAGS_zk_session_timeout, "", - cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader"); + cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader", + cluster_add_.zk_auth_schema(), cluster_add_.zk_cert()); bool ok = zk_client_->Init(); for (int i = 1; i < 3; i++) { if (ok) { diff --git a/src/nameserver/name_server_create_remote_test.cc b/src/nameserver/name_server_create_remote_test.cc index 0075999b645..def3d1d0a07 100644 --- a/src/nameserver/name_server_create_remote_test.cc +++ b/src/nameserver/name_server_create_remote_test.cc @@ -43,8 +43,6 @@ DECLARE_uint32(name_server_task_max_concurrency); DECLARE_uint32(system_table_replica_num); DECLARE_bool(auto_failover); -using ::openmldb::zk::ZkClient; - namespace openmldb { namespace nameserver { diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 862ee42d320..c76054d622b 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -51,6 +51,8 @@ DECLARE_string(endpoint); DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_string(tablet); DECLARE_int32(zk_session_timeout); DECLARE_int32(zk_keep_alive_check_interval); @@ -1411,7 +1413,8 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p zone_info_.set_replica_alias(""); zone_info_.set_zone_term(1); LOG(INFO) << "zone name " << zone_info_.zone_name(); - zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path); + zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path, + FLAGS_zk_auth_schema, FLAGS_zk_cert); if (!zk_client_->Init()) { PDLOG(WARNING, "fail to init zookeeper with cluster[%s]", zk_cluster.c_str()); return false; diff --git a/src/nameserver/name_server_test.cc b/src/nameserver/name_server_test.cc index f1ad0f86eab..eee5d79f351 100644 --- a/src/nameserver/name_server_test.cc +++ b/src/nameserver/name_server_test.cc @@ -38,6 +38,8 @@ DECLARE_string(ssd_root_path); DECLARE_string(hdd_root_path); DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(zk_session_timeout); DECLARE_int32(request_timeout_ms); DECLARE_int32(zk_keep_alive_check_interval); @@ -171,7 +173,8 @@ TEST_P(NameServerImplTest, MakesnapshotTask) { sleep(5); - ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path); + ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path, + FLAGS_zk_auth_schema, FLAGS_zk_cert); ok = zk_client.Init(); ASSERT_TRUE(ok); std::string op_index_node = FLAGS_zk_root_path + "/op/op_index"; diff --git a/src/nameserver/new_server_env_test.cc b/src/nameserver/new_server_env_test.cc index e05d1bc509c..405e3f436e0 100644 --- a/src/nameserver/new_server_env_test.cc +++ b/src/nameserver/new_server_env_test.cc @@ -34,6 +34,8 @@ DECLARE_string(endpoint); DECLARE_string(db_root_path); DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(zk_session_timeout); DECLARE_int32(request_timeout_ms); DECLARE_int32(request_timeout_ms); @@ -108,7 +110,8 @@ void SetSdkEndpoint(::openmldb::RpcClient<::openmldb::nameserver::NameServer_Stu void ShowNameServer(std::map* map) { std::shared_ptr<::openmldb::zk::ZkClient> zk_client; - zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "", 1000, "", FLAGS_zk_root_path); + zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "", 1000, "", FLAGS_zk_root_path, + FLAGS_zk_auth_schema, FLAGS_zk_cert); if (!zk_client->Init()) { ASSERT_TRUE(false); } diff --git a/src/proto/name_server.proto b/src/proto/name_server.proto index b0eb526d8e7..08383b4f7c0 100755 --- a/src/proto/name_server.proto +++ b/src/proto/name_server.proto @@ -365,6 +365,8 @@ message ClusterAddress { optional string zk_endpoints = 1; optional string zk_path = 2; optional string alias = 3; + optional string zk_auth_schema = 4; + optional string zk_cert = 5; } message GeneralRequest {} diff --git a/src/sdk/db_sdk.cc b/src/sdk/db_sdk.cc index c04e86d4f03..0f551853740 100644 --- a/src/sdk/db_sdk.cc +++ b/src/sdk/db_sdk.cc @@ -207,7 +207,9 @@ void ClusterSDK::CheckZk() { bool ClusterSDK::Init() { zk_client_ = new ::openmldb::zk::ZkClient(options_.zk_cluster, "", options_.zk_session_timeout, "", - options_.zk_path); + options_.zk_path, + options_.zk_auth_schema, + options_.zk_cert); bool ok = zk_client_->Init(options_.zk_log_level, options_.zk_log_file); if (!ok) { diff --git a/src/sdk/db_sdk.h b/src/sdk/db_sdk.h index 71e3e321241..c6d2cfbab76 100644 --- a/src/sdk/db_sdk.h +++ b/src/sdk/db_sdk.h @@ -43,11 +43,14 @@ struct ClusterOptions { int32_t zk_session_timeout = 2000; int32_t zk_log_level = 3; std::string zk_log_file; + std::string zk_auth_schema = "digest"; + std::string zk_cert; std::string to_string() { std::stringstream ss; ss << "zk options [cluster:" << zk_cluster << ", path:" << zk_path << ", zk_session_timeout:" << zk_session_timeout - << ", log_level:" << zk_log_level << ", log_file:" << zk_log_file << "]"; + << ", log_level:" << zk_log_level << ", log_file:" << zk_log_file + << ", zk_auth_schema:" << zk_auth_schema << ", zk_cert:" << zk_cert << "]"; return ss.str(); } }; diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 25b51991da6..2556eac681e 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -258,6 +258,8 @@ bool SQLClusterRouter::Init() { coptions.zk_session_timeout = ops->zk_session_timeout; coptions.zk_log_level = ops->zk_log_level; coptions.zk_log_file = ops->zk_log_file; + coptions.zk_auth_schema = ops->zk_auth_schema; + coptions.zk_cert = ops->zk_cert; cluster_sdk_ = new ClusterSDK(coptions); // TODO(hw): no detail error info bool ok = cluster_sdk_->Init(); diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index f88cc0b00f9..68186a83b00 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -58,6 +58,8 @@ struct SQLRouterOptions : BasicRouterOptions { std::string spark_conf_path; uint32_t zk_log_level = 3; // PY/JAVA SDK default info log std::string zk_log_file; + std::string zk_auth_schema = "digest"; + std::string zk_cert; }; struct StandaloneOptions : BasicRouterOptions { diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index bc319c105d7..16958e3aeb2 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -107,6 +107,8 @@ DECLARE_string(zk_cluster); DECLARE_string(zk_root_path); DECLARE_int32(zk_session_timeout); DECLARE_int32(zk_keep_alive_check_interval); +DECLARE_string(zk_auth_schema); +DECLARE_string(zk_cert); DECLARE_int32(binlog_sync_to_disk_interval); DECLARE_int32(binlog_delete_interval); @@ -190,7 +192,8 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path, deploy_collector_ = std::make_unique<::openmldb::statistics::DeployQueryTimeCollector>(); if (!zk_cluster.empty()) { - zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path); + zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path, + FLAGS_zk_auth_schema, FLAGS_zk_cert); bool ok = zk_client_->Init(); if (!ok) { PDLOG(ERROR, "fail to init zookeeper with cluster %s", zk_cluster.c_str()); diff --git a/src/tablet/tablet_impl_keep_alive_test.cc b/src/tablet/tablet_impl_keep_alive_test.cc index eafd3338b4d..7339ca80607 100644 --- a/src/tablet/tablet_impl_keep_alive_test.cc +++ b/src/tablet/tablet_impl_keep_alive_test.cc @@ -66,7 +66,7 @@ TEST_F(TabletImplTest, KeepAlive) { FLAGS_endpoint = "127.0.0.1:9527"; FLAGS_zk_cluster = "127.0.0.1:6181"; FLAGS_zk_root_path = "/rtidb2"; - ZkClient zk_client(FLAGS_zk_cluster, "", 1000, "test1", FLAGS_zk_root_path); + ZkClient zk_client(FLAGS_zk_cluster, "", 1000, "test1", FLAGS_zk_root_path, "", ""); bool ok = zk_client.Init(); ASSERT_TRUE(ok); ok = zk_client.Mkdir("/rtidb2/nodes"); diff --git a/src/zk/dist_lock_test.cc b/src/zk/dist_lock_test.cc index cf81d44ece2..0bf33604bf0 100644 --- a/src/zk/dist_lock_test.cc +++ b/src/zk/dist_lock_test.cc @@ -43,7 +43,7 @@ void OnLockedCallback() { call_invoked = true; } void OnLostCallback() {} TEST_F(DistLockTest, Lock) { - ZkClient client("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock"); + ZkClient client("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock", "", ""); bool ok = client.Init(); ASSERT_TRUE(ok); DistLock lock("/openmldb_lock/nameserver_lock", &client, boost::bind(&OnLockedCallback), @@ -59,7 +59,7 @@ TEST_F(DistLockTest, Lock) { lock.CurrentLockValue(current_lock); ASSERT_EQ("endpoint1", current_lock); call_invoked = false; - ZkClient client2("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock"); + ZkClient client2("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock", "", ""); ok = client2.Init(); if (!ok) { lock.Stop(); diff --git a/src/zk/zk_client.cc b/src/zk/zk_client.cc index 382ce4c00f2..ecc94c1251c 100644 --- a/src/zk/zk_client.cc +++ b/src/zk/zk_client.cc @@ -64,11 +64,15 @@ void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat } ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout, - const std::string& endpoint, const std::string& zk_root_path) + const std::string& endpoint, const std::string& zk_root_path, + const std::string& auth_schema, const std::string& cert) : hosts_(hosts), session_timeout_(session_timeout), endpoint_(endpoint), zk_root_path_(zk_root_path), + auth_schema_(auth_schema), + cert_(cert), + acl_vector_(ZOO_OPEN_ACL_UNSAFE), real_endpoint_(real_endpoint), nodes_root_path_(zk_root_path_ + "/nodes"), nodes_watch_callbacks_(), @@ -88,11 +92,15 @@ ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, i } ZkClient::ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint, - const std::string& zk_root_path, const std::string& zone_path) + const std::string& zk_root_path, const std::string& zone_path, + const std::string& auth_schema, const std::string& cert) : hosts_(hosts), session_timeout_(session_timeout), endpoint_(endpoint), zk_root_path_(zk_root_path), + auth_schema_(auth_schema), + cert_(cert), + acl_vector_(ZOO_OPEN_ACL_UNSAFE), nodes_root_path_(zone_path), nodes_watch_callbacks_(), mu_(), @@ -133,6 +141,14 @@ bool ZkClient::Init(int log_level, const std::string& log_file) { PDLOG(WARNING, "fail to init zk handler with hosts %s, session_timeout %d", hosts_.c_str(), session_timeout_); return false; } + if (!cert_.empty()) { + if (zoo_add_auth(zk_, auth_schema_.c_str(), cert_.data(), cert_.length(), NULL, NULL) != ZOK) { + PDLOG(WARNING, "auth failed. schema: %s cert: %s", auth_schema_.c_str(), cert_.c_str()); + return false; + } + acl_vector_ = ZOO_CREATOR_ALL_ACL; + PDLOG(INFO, "auth ok. schema: %s cert: %s", auth_schema_.c_str(), cert_.c_str()); + } return true; } @@ -173,7 +189,7 @@ bool ZkClient::Register(bool startup_flag) { if (startup_flag) { value = "startup_" + endpoint_; } - int ret = zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); + int ret = zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, ZOO_EPHEMERAL, NULL, 0); if (ret == ZOK) { PDLOG(INFO, "register self with endpoint %s ok", endpoint_.c_str()); registed_.store(true, std::memory_order_relaxed); @@ -231,7 +247,7 @@ bool ZkClient::RegisterName() { } PDLOG(WARNING, "set node with name %s value %s failed", sname.c_str(), value.c_str()); } else { - int ret = zoo_create(zk_, name.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + int ret = zoo_create(zk_, name.c_str(), value.c_str(), value.size(), &acl_vector_, 0, NULL, 0); if (ret == ZOK) { PDLOG(INFO, "register with name %s value %s ok", sname.c_str(), value.c_str()); return true; @@ -281,7 +297,7 @@ bool ZkClient::CreateNode(const std::string& node, const std::string& value, int uint32_t size = node.size() + 11; char path_buffer[size]; // NOLINT int ret = - zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, flags, path_buffer, size); + zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, flags, path_buffer, size); if (ret == ZOK) { assigned_path_name.assign(path_buffer, size - 1); PDLOG(INFO, "create node %s ok and real node name %s", node.c_str(), assigned_path_name.c_str()); @@ -371,9 +387,11 @@ bool ZkClient::GetNodeValueAndStat(const char* node, std::string* value, Stat* s bool ZkClient::DeleteNode(const std::string& node) { std::lock_guard lock(mu_); - if (zoo_delete(zk_, node.c_str(), -1) == ZOK) { + int ret = zoo_delete(zk_, node.c_str(), -1); + if (ret == ZOK) { return true; } + PDLOG(WARNING, "delete %s failed. error no is %d", node.c_str(), ret); return false; } @@ -597,7 +615,7 @@ bool ZkClient::MkdirNoLock(const std::string& path) { } full_path += *it; index++; - int ret = zoo_create(zk_, full_path.c_str(), "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + int ret = zoo_create(zk_, full_path.c_str(), "", 0, &acl_vector_, 0, NULL, 0); if (ret == ZNODEEXISTS || ret == ZOK) { continue; } diff --git a/src/zk/zk_client.h b/src/zk/zk_client.h index e06c0de7e6a..344df5753e2 100644 --- a/src/zk/zk_client.h +++ b/src/zk/zk_client.h @@ -46,10 +46,12 @@ class ZkClient { // session_timeout, the session timeout // endpoint, the client endpoint ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout, - const std::string& endpoint, const std::string& zk_root_path); + const std::string& endpoint, const std::string& zk_root_path, + const std::string& auth_schema, const std::string& cert); ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint, - const std::string& zk_root_path, const std::string& zone_path); + const std::string& zk_root_path, const std::string& zone_path, + const std::string& auth_schema, const std::string& cert); ~ZkClient(); // init zookeeper connections @@ -145,6 +147,9 @@ class ZkClient { int32_t session_timeout_; std::string endpoint_; std::string zk_root_path_; + std::string auth_schema_; + std::string cert_; + struct ACL_vector acl_vector_; std::string real_endpoint_; FILE* zk_log_stream_file_ = NULL; diff --git a/src/zk/zk_client_test.cc b/src/zk/zk_client_test.cc index 0d4ffb5af83..04879c74359 100644 --- a/src/zk/zk_client_test.cc +++ b/src/zk/zk_client_test.cc @@ -49,13 +49,13 @@ void WatchCallback(const std::vector& endpoints) { } TEST_F(ZkClientTest, BadZk) { - ZkClient client("127.0.0.1:13181", "", session_timeout, "127.0.0.1:9527", "/openmldb"); + ZkClient client("127.0.0.1:13181", "", session_timeout, "127.0.0.1:9527", "/openmldb", "", ""); bool ok = client.Init(); ASSERT_FALSE(ok); } TEST_F(ZkClientTest, Init) { - ZkClient client("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb"); + ZkClient client("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb", "", ""); bool ok = client.Init(); ASSERT_TRUE(ok); ok = client.Register(); @@ -71,7 +71,7 @@ TEST_F(ZkClientTest, Init) { ok = client.WatchNodes(); ASSERT_TRUE(ok); { - ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9528", "/openmldb"); + ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9528", "/openmldb", "", ""); ok = client2.Init(); client2.Register(); ASSERT_TRUE(ok); @@ -83,7 +83,7 @@ TEST_F(ZkClientTest, Init) { } TEST_F(ZkClientTest, CreateNode) { - ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb1"); + ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb1", "", ""); bool ok = client.Init(); ASSERT_TRUE(ok); @@ -99,7 +99,7 @@ TEST_F(ZkClientTest, CreateNode) { ret = client.IsExistNode(node); ASSERT_EQ(ret, 0); - ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1"); + ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1", "", ""); ok = client2.Init(); ASSERT_TRUE(ok); @@ -109,7 +109,7 @@ TEST_F(ZkClientTest, CreateNode) { } TEST_F(ZkClientTest, ZkNodeChange) { - ZkClient client("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1"); + ZkClient client("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1", "", ""); bool ok = client.Init(); ASSERT_TRUE(ok); @@ -121,7 +121,7 @@ TEST_F(ZkClientTest, ZkNodeChange) { ret = client.IsExistNode(node); ASSERT_EQ(ret, 0); - ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1"); + ZkClient client2("127.0.0.1:6181", "", session_timeout, "127.0.0.1:9527", "/openmldb1", "", ""); ok = client2.Init(); ASSERT_TRUE(ok); std::atomic detect(false); @@ -146,6 +146,48 @@ TEST_F(ZkClientTest, ZkNodeChange) { ASSERT_TRUE(detect.load()); } +TEST_F(ZkClientTest, Auth) { + std::string node = "/openmldb_auth/node1"; + { + ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb_auth", "digest", "user1:123456"); + bool ok = client.Init(); + ASSERT_TRUE(ok); + + int ret = client.IsExistNode(node); + ASSERT_EQ(ret, 1); + ok = client.CreateNode(node, "value"); + ASSERT_TRUE(ok); + ret = client.IsExistNode(node); + ASSERT_EQ(ret, 0); + } + { + ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb_auth", "", ""); + bool ok = client.Init(); + ASSERT_TRUE(ok); + std::string value; + ASSERT_FALSE(client.GetNodeValue(node, value)); + ASSERT_FALSE(client.CreateNode("/openmldb_auth/node1/dd", "aaa")); + } + { + ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb_auth", "digest", "user1:wrong"); + bool ok = client.Init(); + ASSERT_TRUE(ok); + std::string value; + ASSERT_FALSE(client.GetNodeValue(node, value)); + ASSERT_FALSE(client.CreateNode("/openmldb_auth/node1/dd", "aaa")); + } + { + ZkClient client("127.0.0.1:6181", "", 1000, "127.0.0.1:9527", "/openmldb_auth", "digest", "user1:123456"); + bool ok = client.Init(); + ASSERT_TRUE(ok); + std::string value; + ASSERT_TRUE(client.GetNodeValue(node, value)); + ASSERT_EQ("value", value); + ASSERT_TRUE(client.DeleteNode(node)); + ASSERT_TRUE(client.DeleteNode("/openmldb_auth")); + } +} + } // namespace zk } // namespace openmldb