diff --git a/docs/zh/maintain/monitoring.md b/docs/zh/maintain/monitoring.md index 905644c74df..46f6b08d05a 100644 --- a/docs/zh/maintain/monitoring.md +++ b/docs/zh/maintain/monitoring.md @@ -31,10 +31,8 @@ OpenMLDB exporter 是以 Python 实现的 Prometheus exporter,核心是通过 2. 启动 OpenMLDB - 参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, 或者确认启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`。 + 参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, OpenMLDB启动脚本(无论是sbin或bin)都已配置为true,如果你使用个人方式启动,需要保证启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`。 - 默认启动脚本 `bin/start.sh` 开启了 server status, 不需要额外配置。 - 3. 注意:合理选择 OpenMLDB 各组件和 OpenMLDB exporter, 以及 Prometheus, Grafana 的绑定 IP 地址,确保 Grafana 可以访问到 Prometheus, 并且 Prometheus,OpenMLDB exporter 和 OpenMLDB 各个组件之间可以相互访问。 ### 部署 OpenMLDB exporter @@ -168,13 +166,6 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope - component status: 集群组件状态 - table status: 数据库表相关信息,如 `rows_count`, `memory_bytes` - - deploy query response time: deployment query 在 tablet 内部的运行时间 - - **除了 deploy query response time 指标外, 成功配置监控之后都可以直接查询到指标. Deploy query response time 需要全局变量 `deploy_stats` 开启后才会有数据, 在 OpenMLDB CLI 中输入 SQL:** - - ```sql - SET GLOBAL deploy_stats = 'on'; - ``` 你可以通过 @@ -184,9 +175,27 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope 查看完整 DB-Level 指标和帮助信息。 +通过Component-Level 指标通过Grafana聚合的DB-Level 指标(未单独声明时,time单位为us): + +- deploy query response time: deployment query 在OpenMLDB内部的运行时间,按DB.DEPLOYMENT汇总 + **需要全局变量 `deploy_stats` 开启后才会开始统计, 在 OpenMLDB CLI 中输入 SQL:** + + ```sql + SET GLOBAL deploy_stats = 'on'; + ``` + 然后,还需要执行deplpoyment,才会出现相应的指标。 + 如果SET变量为off,会清空server中的所有deployment指标并停止统计(已被Prometheus抓取的数据不影响)。 + - count:count类统计值从deploy_stats on时开始统计,不区分请求的成功和失败。 + - latency, qps:这类指标只统计`[current_time - interval, current_time]`时间窗口内的数据,interval由Tablet Server配置项`bvar_dump_interval`配置,默认为75秒。 + +- api server http time: 各API接口的处理耗时(不包含route),只监测接口耗时,不做细粒度区分,目前也不通过Grafana展示,可以通过Prometheus手动查询。目前监测`deployment`、`sp`和`query`三种方法。 + - api server route time: APIServer进行http route的耗时,通常为us级别,一般忽略不计 + +以上聚合指标的获取方式见下文。在组件指标中,deploy query response time关键字为`deployment`,api server http time关键字为`http_method`。如果指标展示不正常,可以查询组件指标定位问题。 + ### 2. Component-Level 指标 -OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/incubator-brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项: +OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项: ```yaml - job_name: openmldb_components @@ -203,6 +212,7 @@ OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC serve - BRPC server 进程相关信息 - 对应 BRPC server 定义的 RPC method 相关指标,例如该 RPC 的请求 `count`, `error_count`, `qps` 和 `response_time` + - Deployment 相关指标,分deployment统计,但只统计该tablet上的deployment请求。区别于Grafana做的集群级别整合。 通过 diff --git a/release/conf/apiserver.flags.template b/release/conf/apiserver.flags.template index 539bcc8e4a4..40c5809b7ac 100644 --- a/release/conf/apiserver.flags.template +++ b/release/conf/apiserver.flags.template @@ -8,3 +8,4 @@ --log_level=info #--thread_pool_size=16 +--bvar_max_dump_multi_dimension_metric_number=10 diff --git a/release/conf/tablet.flags.template b/release/conf/tablet.flags.template index 3d126d74123..b284a7a81ca 100644 --- a/release/conf/tablet.flags.template +++ b/release/conf/tablet.flags.template @@ -98,3 +98,5 @@ # turn this option on to export openmldb metric status # --enable_status_service=false +--bvar_max_dump_multi_dimension_metric_number=10 +--bvar_dump_interval=75 diff --git a/src/apiserver/api_server_impl.cc b/src/apiserver/api_server_impl.cc index c24b76c40ce..1f50472dddb 100644 --- a/src/apiserver/api_server_impl.cc +++ b/src/apiserver/api_server_impl.cc @@ -22,11 +22,18 @@ #include #include "apiserver/interface_provider.h" + +#include "absl/cleanup/cleanup.h" #include "brpc/server.h" +#include "butil/time.h" namespace openmldb { namespace apiserver { +APIServerImpl::APIServerImpl(const std::string& endpoint) + : md_recorder_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1), "http_method", {"method"}), + provider_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1)) {} + APIServerImpl::~APIServerImpl() = default; bool APIServerImpl::Init(const sdk::ClusterOptions& options) { @@ -72,7 +79,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); auto* cntl = dynamic_cast(cntl_base); - // The unresolved path has no slashes at the beginning(guaranteed by brpc), it's not good for url parsing auto unresolved_path = "/" + cntl->http_request().unresolved_path(); auto method = cntl->http_request().method(); @@ -81,7 +87,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht JsonWriter writer; provider_.handle(unresolved_path, method, req_body, writer); - cntl->response_attachment().append(writer.GetString()); } @@ -110,6 +115,12 @@ std::map mode_map{ void APIServerImpl::RegisterQuery() { provider_.post("/dbs/:db_name", [this](const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer) { + auto start = absl::Now(); + absl::Cleanup method_latency = [this, start]() { + // TODO(hw): query should split into async/sync, online/offline? + absl::Duration time = absl::Now() - start; + *md_recorder_.get_stats({"query"}) << absl::ToInt64Microseconds(time); + }; auto resp = GeneralResp(); auto db_it = param.find("db_name"); if (db_it == param.end()) { @@ -158,9 +169,9 @@ void APIServerImpl::RegisterQuery() { }); } -bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, - const butil::rapidjson::Value& common_cols_v, - std::shared_ptr row) { +absl::Status APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, + const butil::rapidjson::Value& common_cols_v, + std::shared_ptr row) { auto sch = row->GetSchema(); // scan all strings to init the total string length @@ -186,18 +197,20 @@ bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_c for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) { if (sch->IsConstant(i)) { if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) { - return false; + return absl::InvalidArgumentError( + absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed")); } ++common_idx; } else { if (!AppendJsonValue(non_common_cols_v[non_common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) { - return false; + return absl::InvalidArgumentError( + absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed")); } ++non_common_idx; } } - return true; + return absl::OkStatus(); } template @@ -281,9 +294,9 @@ bool APIServerImpl::AppendJsonValue(const butil::rapidjson::Value& v, hybridse:: } // common_cols_v is still an array, but non_common_cols_v is map, should find the value by the column name -bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, - const butil::rapidjson::Value& common_cols_v, - std::shared_ptr row) { +absl::Status APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, + const butil::rapidjson::Value& common_cols_v, + std::shared_ptr row) { auto sch = row->GetSchema(); // scan all strings to init the total string length @@ -300,8 +313,7 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com if (sch->GetColumnType(i) == hybridse::sdk::kTypeString) { auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str()); if (v == non_common_cols_v.MemberEnd()) { - LOG(WARNING) << "can't find " << sch->GetColumnName(i); - return false; + return absl::InvalidArgumentError("can't find col " + sch->GetColumnName(i)); } str_len_sum += v->value.GetStringLength(); } @@ -313,23 +325,22 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) { if (sch->IsConstant(i)) { if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) { - LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed"; - return false; + return absl::InvalidArgumentError( + absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed")); } ++common_idx; } else { auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str()); if (v == non_common_cols_v.MemberEnd()) { - LOG(WARNING) << "can't find " << sch->GetColumnName(i); - return false; + return absl::InvalidArgumentError("can't find " + sch->GetColumnName(i)); } if (!AppendJsonValue(v->value, sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) { - LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed"; - return false; + return absl::InvalidArgumentError( + absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed")); } } } - return true; + return absl::OkStatus(); } void APIServerImpl::RegisterPut() { @@ -420,6 +431,11 @@ void APIServerImpl::RegisterExecSP() { void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer) { + auto start = absl::Now(); + absl::Cleanup method_latency = [this, start, has_common_col]() { + absl::Duration time = absl::Now() - start; + *md_recorder_.get_stats({has_common_col ? "sp" : "deployment"}) << absl::ToInt64Microseconds(time); + }; auto resp = GeneralResp(); auto db_it = param.find("db_name"); auto sp_it = param.find("sp_name"); @@ -458,7 +474,6 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide return; } const auto& rows = input->value; - hybridse::sdk::Status status; // We need to use ShowProcedure to get input schema(should know which column is constant). // GetRequestRowByProcedure can't do that. @@ -498,13 +513,15 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide writer << resp.Set("Invalid input data size in row " + std::to_string(i)); return; } - if (!JsonArray2SQLRequestRow(rows[i], common_cols_v, row)) { - writer << resp.Set("Translate to request row failed in array row " + std::to_string(i)); + if (auto st = JsonArray2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) { + writer << resp.Set("Translate to request row failed in array row " + std::to_string(i) + ", " + + st.ToString()); return; } } else if (rows[i].IsObject()) { - if (!JsonMap2SQLRequestRow(rows[i], common_cols_v, row)) { - writer << resp.Set("Translate to request row failed in map row " + std::to_string(i)); + if (auto st = JsonMap2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) { + writer << resp.Set("Translate to request row failed in map row " + std::to_string(i) + ", " + + st.ToString()); return; } } else { diff --git a/src/apiserver/api_server_impl.h b/src/apiserver/api_server_impl.h index 9c936c9748e..d17681d181c 100644 --- a/src/apiserver/api_server_impl.h +++ b/src/apiserver/api_server_impl.h @@ -31,6 +31,10 @@ #include "sdk/sql_cluster_router.h" #include "sdk/sql_request_row.h" +#include "absl/status/status.h" +#include "bvar/bvar.h" +#include "bvar/multi_dimension.h" // latency recorder + namespace openmldb { namespace apiserver { @@ -45,7 +49,7 @@ using butil::rapidjson::Writer; // Both input and output are json data. We use rapidjson to handle it. class APIServerImpl : public APIServer { public: - APIServerImpl() = default; + explicit APIServerImpl(const std::string& endpoint); ~APIServerImpl() override; bool Init(const sdk::ClusterOptions& options); bool Init(::openmldb::sdk::DBSDK* cluster); @@ -69,10 +73,10 @@ class APIServerImpl : public APIServer { void ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer); // NOLINT - static bool JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, - const butil::rapidjson::Value& common_cols_v, - std::shared_ptr row); - static bool JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, + static absl::Status JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, + const butil::rapidjson::Value& common_cols_v, + std::shared_ptr row); + static absl::Status JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v, const butil::rapidjson::Value& common_cols_v, std::shared_ptr row); template @@ -82,13 +86,19 @@ class APIServerImpl : public APIServer { // may get segmentation fault when throw boost::bad_lexical_cast, so we use std::from_chars template static bool FromString(const std::string& s, T& value) { // NOLINT - auto res = std::from_chars(s.data(), s.data() + s.size(), value); - return res.ec == std::errc() && (res.ptr - s.data() == s.size()); + if (auto res = std::from_chars(s.data(), s.data() + s.size(), value); res.ec == std::errc()) { + auto len = res.ptr - s.data(); + return len >= 0 ? (uint64_t)len == s.size() : false; + } else { + return false; + } } private: - std::shared_ptr sql_router_; + bvar::MultiDimension md_recorder_; InterfaceProvider provider_; + + std::shared_ptr sql_router_; // cluster_sdk_ is not owned by this class. ::openmldb::sdk::DBSDK* cluster_sdk_ = nullptr; }; diff --git a/src/apiserver/api_server_test.cc b/src/apiserver/api_server_test.cc index d14037ae506..11dd390b914 100644 --- a/src/apiserver/api_server_test.cc +++ b/src/apiserver/api_server_test.cc @@ -1145,7 +1145,8 @@ TEST_F(APIServerTest, jsonInput) { ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText(); LOG(INFO) << "get sp resp: " << show_cntl.response_attachment(); - // call deployment in json style input(won't check if it's a sp or deployment) + // call sp in deployment api with json style input(won't check if it's a sp or deployment), so it'll have field + // `common_cols_data` { brpc::Controller cntl; cntl.http_request().set_method(brpc::HTTP_METHOD_POST); diff --git a/src/apiserver/interface_provider.cc b/src/apiserver/interface_provider.cc index c4be99c4726..4a482b3aab0 100644 --- a/src/apiserver/interface_provider.cc +++ b/src/apiserver/interface_provider.cc @@ -25,6 +25,8 @@ #include #include "boost/algorithm/string/split.hpp" +#include "butil/time.h" +#include "bvar/bvar.h" #include "glog/logging.h" namespace openmldb { @@ -159,6 +161,8 @@ void InterfaceProvider::registerRequest(brpc::HttpMethod type, std::string const bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& method, const butil::IOBuf& req_body, JsonWriter& writer) { + butil::Timer tm; + tm.start(); auto err = GeneralResp(); Url url; @@ -190,6 +194,9 @@ bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& } auto params = extractParameters(url, request->url); + tm.stop(); + route_recorder_ << tm.u_elapsed(); + request->callback(params, req_body, writer); return true; } diff --git a/src/apiserver/interface_provider.h b/src/apiserver/interface_provider.h index e2ffe4661a4..8589b9229c5 100644 --- a/src/apiserver/interface_provider.h +++ b/src/apiserver/interface_provider.h @@ -32,6 +32,7 @@ #include "apiserver/json_helper.h" #include "brpc/http_method.h" // HttpMethod #include "butil/iobuf.h" // IOBuf +#include "bvar/bvar.h" // latency recorder #include "proto/api_server.pb.h" namespace openmldb { @@ -111,7 +112,7 @@ class ReducedUrlParser { class InterfaceProvider { public: - InterfaceProvider() = default; + explicit InterfaceProvider(const std::string& metric_prefix) : route_recorder_(metric_prefix, "http_route") {} InterfaceProvider& operator=(InterfaceProvider const&) = delete; InterfaceProvider(InterfaceProvider const&) = delete; @@ -160,6 +161,9 @@ class InterfaceProvider { void registerRequest(brpc::HttpMethod, const std::string& path, std::function&& callback); private: + // we only record route latency, method latency is recorded in callback(you may need record in parts), defined in + // api server impl + bvar::LatencyRecorder route_recorder_; std::unordered_map> requests_; }; diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 9357b23e29a..221fbb9e976 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -1150,7 +1150,7 @@ bool TabletClient::SubBatchRequestQuery(const ::openmldb::api::SQLBatchRequestQu if (callback == nullptr) { return false; } - return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SQLBatchRequestQuery, + return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SubBatchRequestQuery, callback->GetController().get(), &request, callback->GetResponse().get(), callback); } diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index 3cf22b2df6d..6969e000c0e 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -3878,7 +3878,7 @@ void StartAPIServer() { GetRealEndpoint(&real_endpoint); } - auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>(); + auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>(real_endpoint); if (!FLAGS_nameserver.empty()) { std::vector vec; boost::split(vec, FLAGS_nameserver, boost::is_any_of(":")); diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 1896ac7c674..39e33637ba0 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -3697,7 +3697,7 @@ TEST_F(SqlCmdTest, SelectWithAddNewIndex) { // -------------------------------------------------------------------------------------- // basic functional UTs to test if it is correct for deploy query response time collection -// see NameServerImpl::SyncDeployStats & TabletImpl::TryCollectDeployStats +// see TabletImpl::CollectDeployStats // -------------------------------------------------------------------------------------- // a proxy class to create and cleanup deployment stats more gracefully @@ -3770,12 +3770,6 @@ struct DeploymentEnv { ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace; } - void EnableDeployStats() { - ProcessSQLs(sr_, { - "set global deploy_stats = 'on'", - }); - } - sdk::SQLClusterRouter* sr_; absl::BitGen gen_; // variables generate randomly in SetUp @@ -3802,111 +3796,6 @@ struct DeploymentEnv { } }; -static const char QueryDeployResponseTime[] = "select * from INFORMATION_SCHEMA.DEPLOY_RESPONSE_TIME"; - -TEST_P(DBSDKTest, DeployStatsNotEnableByDefault) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - - DeploymentEnv env(sr); - env.SetUp(); - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - - absl::SleepFor(absl::Seconds(3)); - - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(0, rs->Size()); - - env.EnableDeployStats(); - - absl::SleepFor(absl::Seconds(3)); - - // HandleSQL exists only for purpose of printing - HandleSQL(QueryDeployResponseTime); - rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(0, rs->Size()); -} - -TEST_P(DBSDKTest, DeployStatsEnabledAfterSetGlobal) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - - // FIXME(#1547): test skiped due to Deploy Response Time can't enable in standalone mode - if (cs->IsClusterMode()) { - DeploymentEnv env(sr); - env.SetUp(); - env.EnableDeployStats(); - // sleep a while for global variable notification - absl::SleepFor(absl::Seconds(2)); - - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - // as deploy stats in tablet is lazy managed, the deploy stats will stay empty util the first procedure call - // happens - ASSERT_EQ(0, rs->Size()); - - // warm up deploy stats - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - - absl::SleepFor(absl::Seconds(3)); - - HandleSQL(QueryDeployResponseTime); - rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(TIME_DISTRIBUTION_BUCKET_COUNT, rs->Size()); - - int cnt = 0; - while (rs->Next()) { - EXPECT_EQ(absl::StrCat(env.db_, ".", env.dp_name_), rs->GetAsStringUnsafe(0)); - cnt += rs->GetInt32Unsafe(2); - } - EXPECT_EQ(2, cnt); - } -} - -TEST_P(DBSDKTest, DeployStatsOnlyCollectDeployProcedure) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - if (cs->IsClusterMode()) { - DeploymentEnv env(sr); - env.SetUp(); - - env.EnableDeployStats(); - absl::SleepFor(absl::Seconds(2)); - - for (int i = 0; i < 5; ++i) { - env.CallProcedure(); - } - - for (int i = 0; i < 10; ++i) { - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - } - absl::SleepFor(absl::Seconds(3)); - - HandleSQL(QueryDeployResponseTime); - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(TIME_DISTRIBUTION_BUCKET_COUNT, rs->Size()); - int cnt = 0; - while (rs->Next()) { - EXPECT_EQ(absl::StrCat(env.db_, ".", env.dp_name_), rs->GetAsStringUnsafe(0)); - cnt += rs->GetInt32Unsafe(2); - } - EXPECT_EQ(10 + 10, cnt); - } -} - class StripSpaceTest : public ::testing::TestWithParam> {}; std::vector> strip_cases = { diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 862ee42d320..53d7d655864 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -72,7 +72,6 @@ DECLARE_int32(make_snapshot_time); DECLARE_int32(make_snapshot_check_interval); DECLARE_bool(use_name); DECLARE_bool(enable_distsql); -DECLARE_uint32(sync_deploy_stats_timeout); namespace openmldb { namespace nameserver { @@ -1470,8 +1469,6 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p task_vec_.resize(FLAGS_name_server_task_max_concurrency + FLAGS_name_server_task_concurrency_for_replica_cluster); task_thread_pool_.DelayTask(FLAGS_make_snapshot_check_interval, boost::bind(&NameServerImpl::SchedMakeSnapshot, this)); - task_thread_pool_.DelayTask(FLAGS_sync_deploy_stats_timeout, - boost::bind(&NameServerImpl::ScheduleSyncDeployStats, this)); return true; } @@ -9328,21 +9325,6 @@ void NameServerImpl::DropProcedureOnTablet(const std::string& db_name, const std } } - bool is_deployment_procedure = false; - { - std::lock_guard lock(mu_); - auto it = db_sp_info_map_.find(db_name); - if (it != db_sp_info_map_.end()) { - auto iit = it->second.find(sp_name); - is_deployment_procedure = iit != it->second.end() && iit->second->type() == type::kReqDeployment; - } - } - std::shared_ptr info; - auto success = GetTableInfo(DEPLOY_RESPONSE_TIME, INFORMATION_SCHEMA_DB, &info); - // NOTE: deploy stats records will delete even when global setting deploy_stats is turned off while there are - // records for previous deploy query (during deploy_stats = 'on') - bool drop_deploy_stats = is_deployment_procedure && success && info != nullptr; - for (auto tb_client : tb_client_vec) { if (!tb_client->DropProcedure(db_name, sp_name)) { PDLOG(WARNING, "drop procedure on tablet failed. db_name[%s], sp_name[%s], endpoint[%s]", db_name.c_str(), @@ -9353,44 +9335,6 @@ void NameServerImpl::DropProcedureOnTablet(const std::string& db_name, const std PDLOG(INFO, "drop procedure on tablet success. db_name[%s], sp_name[%s], endpoint[%s]", db_name.c_str(), sp_name.c_str(), tb_client->GetEndpoint().c_str()); } - - if (drop_deploy_stats && info->table_partition_size() > 0) { - std::string endpoint; - for (auto& meta : info->table_partition()[0].partition_meta()) { - if (meta.is_leader()) { - endpoint = meta.endpoint(); - } - } - auto tablet_info = GetTablet(endpoint); - if (tablet_info == nullptr) { - PDLOG(ERROR, "no leader exists for system table %s", DEPLOY_RESPONSE_TIME); - return; - } - - auto tb_client = tablet_info->client_; - - auto deploy_name = absl::StrCat(db_name, ".", sp_name); - uint32_t pid = static_cast(::openmldb::base::hash64(deploy_name) % info->table_partition_size()); - auto time = absl::Microseconds(1); - int cnt = 0; - std::string msg; - while (cnt++ < TIME_DISTRIBUTION_BUCKET_COUNT - 1) { - auto key = absl::StrCat(deploy_name, "|", statistics::GetDurationAsStr(time, statistics::TimeUnit::SECOND)); - if (!tb_client->Delete(info->tid(), pid, key, "", msg)) { - // NOTE: some warning appears but is expected, just ingore: - // 1. when you create a deploy query but not call it any time before delete it - // 2. deploy_stats is always turned off - PDLOG(WARNING, "failed to delete entry in %s in tablet %s where key = %s : %s", DEPLOY_RESPONSE_TIME, - tb_client->GetEndpoint(), key, msg); - } - time *= 10; - } - auto key = absl::StrCat(deploy_name, "|", MAX_STRING); - if (!tb_client->Delete(info->tid(), pid, key, "", msg)) { - PDLOG(WARNING, "failed to delete entry in %s in tablet %s where key = %s : %s", DEPLOY_RESPONSE_TIME, - tb_client->GetEndpoint(), key, msg); - } - } } void NameServerImpl::DropProcedure(RpcController* controller, const api::DropProcedureRequest* request, @@ -9908,143 +9852,6 @@ base::Status NameServerImpl::InitGlobalVarTable() { return {}; } -static const std::string& QueryDeployStats() { - static const std::string query_deploy_stats = - absl::StrCat("select * from ", nameserver::INFORMATION_SCHEMA_DB, ".", nameserver::DEPLOY_RESPONSE_TIME); - return query_deploy_stats; -} - -static const std::string& QueryDeployStatsIsOn() { - static const std::string query_deploy_stats_is_on = - absl::StrCat("select * from ", nameserver::INFORMATION_SCHEMA_DB, ".", nameserver::GLOBAL_VARIABLES, - " where Variable_name = 'deploy_stats'"); - return query_deploy_stats_is_on; -} - -void NameServerImpl::SyncDeployStats() { - // Step one: check condition for deploy stats. only all of those meet: - // 1. current ns is master - // 2. deploy_stats global variable is set to 'on' or 'true' - if (startup_mode_ == type::kStandalone && running_.load(std::memory_order_acquire)) { - DLOG(INFO) << "sync deploy stats skipped for non-leader ns on cluster"; - FreeSdkConnection(); - return; - } - - if (!GetSdkConnection()) { - LOG(ERROR) << "failed to get sdk connection"; - return; - } - auto sr = std::atomic_load_explicit(&sr_, std::memory_order_acquire); - if (sr == nullptr) { - LOG(ERROR) << "sdk connection is null"; - return; - } - ::hybridse::sdk::Status s; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployStatsIsOn(), {}, &s); - if (!s.IsOK()) { - LOG(ERROR) << "[ERROR] query global variable deploy_stats: " << s.msg; - return; - } - - bool sync_stats = false; - while (rs->Next()) { - auto val = rs->GetStringUnsafe(1); - sync_stats = (val == "on" || val == "true"); - } - - if (!sync_stats) { - DLOG(INFO) << "sync deploy stats skipped when deploy_stats is off"; - return; - } - - // Step two: Fetch And Flush deploy stats from each tablet - std::unordered_map> active_tablets; - { - std::lock_guard lock(mu_); - for (auto& kv : tablets_) { - if (kv.second->Health()) { - active_tablets.emplace(kv.first, kv.second->client_); - } - } - } - statistics::DeployResponseTimeRowReducer reducer; - for (auto& client : active_tablets) { - ::openmldb::api::DeployStatsResponse res; - if (!client.second->GetAndFlushDeployStats(&res)) { - LOG(ERROR) << "GetAndFlushDeployStats from " << client.first << " failed "; - continue; - } - - for (auto& r : res.rows()) { - reducer.Reduce(r.deploy_name(), - statistics::ParseDurationFromStr(r.time(), statistics::TimeUnit::MICRO_SECOND), r.count(), - statistics::ParseDurationFromStr(r.total(), statistics::TimeUnit::MICRO_SECOND)); - } - } - - // Step three: Query old deploy response time from table - rs = sr->ExecuteSQLParameterized("", QueryDeployStats(), {}, &s); - if (!s.IsOK()) { - LOG(ERROR) << "[ERROR] querying DEPLOY_RESPONSE_TIME" << s.msg; - return; - } - - // old reducer help find those rows already in table and but there is - // no new incremental stats reduced from all tablets - statistics::DeployResponseTimeRowReducer old_reducer; - while (rs->Next()) { - auto name = rs->GetAsStringUnsafe(0); - auto time = rs->GetAsStringUnsafe(1); - uint64_t cnt = 0; - if (rs->GetSchema()->GetColumnType(2) == hybridse::sdk::DataType::kTypeInt64) { - cnt = static_cast(rs->GetInt64Unsafe(2)); - } else { - cnt = static_cast(rs->GetInt32Unsafe(2)); - } - auto total = rs->GetAsStringUnsafe(3); - - auto ts = statistics::ParseDurationFromStr(time, statistics::TimeUnit::SECOND); - auto tt = statistics::ParseDurationFromStr(total, statistics::TimeUnit::SECOND); - - reducer.Reduce(name, ts, cnt, tt); - old_reducer.Reduce(name, ts, cnt, tt); - } - - // Step four: update DEPLOY_RESPONSE_TIME table by the new rows - // only for rows that meet any of conditiions below: - // 1. the incremental count and total is bigger than 0 - // 2. original table do not have row for that (deploy_name + time) key yet - std::string insert_deploy_stat = absl::StrCat("insert into ", nameserver::INFORMATION_SCHEMA_DB, ".", - nameserver::DEPLOY_RESPONSE_TIME, " values "); - for (auto& row : reducer.Rows()) { - auto old_it = old_reducer.Find(row->deploy_name_, row->time_); - if (old_it != nullptr && row->count_ == old_it->count_) { - // don't update table only if there is no incremental data, and there is record already in table - continue; - } - - std::string time = row->GetTimeAsStr(statistics::TimeUnit::SECOND); - auto insert_sql = absl::StrCat(insert_deploy_stat, " ( '", row->deploy_name_, "', '", - time, "', ", row->count_, - ",'", row->GetTotalAsStr(statistics::TimeUnit::SECOND), "' )"); - - hybridse::sdk::Status st; - DLOG(INFO) << "sync deploy stats: executing sql: " << insert_sql; - sr->ExecuteInsert("", insert_sql, &st); - if (!st.IsOK()) { - LOG(ERROR) << "[ERROR] insert deploy stats failed: " << st.msg; - } - } - // TODO(ace): add logs for summary how many rows affected and time cost -} - -void NameServerImpl::ScheduleSyncDeployStats() { - SyncDeployStats(); - task_thread_pool_.DelayTask(FLAGS_sync_deploy_stats_timeout, - boost::bind(&NameServerImpl::ScheduleSyncDeployStats, this)); -} - /// \beirf create a SQLClusterRouter instance for use like monitoring statistics collecting /// the actual instance is stored in `sr_` member /// diff --git a/src/nameserver/name_server_impl.h b/src/nameserver/name_server_impl.h index b9755c4aa1c..825c5870d8a 100644 --- a/src/nameserver/name_server_impl.h +++ b/src/nameserver/name_server_impl.h @@ -670,11 +670,6 @@ class NameServerImpl : public NameServer { uint64_t GetTerm() const; - // write deploy statistics into table - void SyncDeployStats(); - - void ScheduleSyncDeployStats(); - bool GetSdkConnection(); void FreeSdkConnection(); diff --git a/src/nameserver/name_server_test.cc b/src/nameserver/name_server_test.cc index f1ad0f86eab..3c2089ab251 100644 --- a/src/nameserver/name_server_test.cc +++ b/src/nameserver/name_server_test.cc @@ -44,7 +44,6 @@ DECLARE_int32(zk_keep_alive_check_interval); DECLARE_int32(make_snapshot_threshold_offset); DECLARE_uint32(name_server_task_max_concurrency); DECLARE_uint32(system_table_replica_num); -DECLARE_uint32(sync_deploy_stats_timeout); DECLARE_bool(auto_failover); using brpc::Server; @@ -1294,6 +1293,5 @@ int main(int argc, char** argv) { FLAGS_ssd_root_path = tmp_path.GetTempPath("ssd"); FLAGS_hdd_root_path = tmp_path.GetTempPath("hdd"); FLAGS_system_table_replica_num = 0; - FLAGS_sync_deploy_stats_timeout = 1000000; return RUN_ALL_TESTS(); } diff --git a/src/statistics/query_response_time/CMakeLists.txt b/src/statistics/query_response_time/CMakeLists.txt index e309934f318..b03aeef65c5 100644 --- a/src/statistics/query_response_time/CMakeLists.txt +++ b/src/statistics/query_response_time/CMakeLists.txt @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -set(LINK_LIBS absl::time absl::random_random absl::strings absl::status absl::statusor absl::synchronization ${GTEST_LIBRARIES} ${GLOG_LIBRARY} ${GFLAGS_LIBRARY}) +set(LINK_LIBS absl::time absl::random_random absl::strings absl::status absl::statusor absl::synchronization ${BRPC_LIBS} ${GTEST_LIBRARIES} ${GLOG_LIBRARY} ${GFLAGS_LIBRARY}) link_libraries(${LINK_LIBS}) if(CMAKE_CXX_COMPILER_ID MATCHES "(AppleClang)|(Clang)") add_definitions(-Wthread-safety) endif() -add_library(query_response_time STATIC ${CMAKE_CURRENT_SOURCE_DIR}/deploy_query_response_time.cc ${CMAKE_CURRENT_SOURCE_DIR}/query_response_time.cc) +add_library(query_response_time STATIC ${CMAKE_CURRENT_SOURCE_DIR}/deployment_metric_collector.cc) function(add_test_file TARGET_NAME SOURCE_NAME) add_executable(${TARGET_NAME} ${SOURCE_NAME}) @@ -40,8 +40,7 @@ function(add_test_file TARGET_NAME SOURCE_NAME) endfunction(add_test_file) if(TESTING_ENABLE) - add_test_file(query_response_time_test ${CMAKE_CURRENT_SOURCE_DIR}/query_response_time_test.cc) - add_test_file(deploy_query_response_time_test ${CMAKE_CURRENT_SOURCE_DIR}/deploy_query_response_time_test.cc) + add_test_file(deployment_metric_collector_test ${CMAKE_CURRENT_SOURCE_DIR}/deployment_metric_collector_test.cc) if(CMAKE_PROJECT_NAME STREQUAL "openmldb") set(test_list ${test_list} PARENT_SCOPE) diff --git a/src/statistics/query_response_time/deployment_metric_collector.cc b/src/statistics/query_response_time/deployment_metric_collector.cc new file mode 100644 index 00000000000..252821254b8 --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector.cc @@ -0,0 +1,43 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "statistics/query_response_time/deployment_metric_collector.h" + +namespace openmldb::statistics { + +absl::Status DeploymentMetricCollector::Collect(const std::string& db, const std::string& deploy_name, + absl::Duration time) { + absl::ReaderMutexLock lock(&mutex_); + auto it = md_recorder_->get_stats({db, deploy_name}); + if (it == nullptr) { + LOG(WARNING) << "reach limit size, collect failed"; + return absl::OutOfRangeError("multi-dimensional recorder reaches limit size, please delete old deploy"); + } + *it << absl::ToInt64Microseconds(time); + return absl::OkStatus(); +} + +absl::Status DeploymentMetricCollector::DeleteDeploy(const std::string& db, const std::string& deploy_name) { + absl::ReaderMutexLock lock(&mutex_); + md_recorder_->delete_stats({db, deploy_name}); + return absl::OkStatus(); +} + +void DeploymentMetricCollector::Reset() { + absl::WriterMutexLock lock(&mutex_); + md_recorder_ = make_shared(prefix_); +} +} // namespace openmldb::statistics diff --git a/src/statistics/query_response_time/deployment_metric_collector.h b/src/statistics/query_response_time/deployment_metric_collector.h new file mode 100644 index 00000000000..da93045ba08 --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector.h @@ -0,0 +1,79 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_STATISTICS_DEPLOYMENT_METRIC_COLLECTOR_H_ +#define SRC_STATISTICS_DEPLOYMENT_METRIC_COLLECTOR_H_ + +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/time.h" +#include "bvar/bvar.h" +#include "bvar/multi_dimension.h" +#include "gflags/gflags.h" + +namespace openmldb::statistics { +class DeploymentMetricCollector { + public: + typedef typename bvar::MultiDimension MDRecorder; + DeploymentMetricCollector(const std::string& prefix) : prefix_(prefix), md_recorder_(make_shared(prefix)) { + // already expose_as when MultiDimension ctor + } + // collector is not copyable + DeploymentMetricCollector(const DeploymentMetricCollector& c) = delete; + + ~DeploymentMetricCollector() {} + // . + absl::Status Collect(const std::string& db, const std::string& deploy_name, absl::Duration time) + LOCKS_EXCLUDED(mutex_); + absl::Status DeleteDeploy(const std::string& db, const std::string& deploy_name) LOCKS_EXCLUDED(mutex_); + void Reset() LOCKS_EXCLUDED(mutex_); + + // usually used for debug + std::string Desc(const std::list& key) LOCKS_EXCLUDED(mutex_) { + absl::ReaderMutexLock lock(&mutex_); + std::stringstream ss; + if (key.empty()) { + md_recorder_->describe(ss); + } else if (md_recorder_->has_stats(key)) { + auto rd = md_recorder_->get_stats(key); + ss << "count:" << rd->count() << ", qps:" << rd->qps() << ", latency:[" << rd->latency() << "," + << rd->latency_percentile(0.8) << "," << rd->latency_percentile(0.9) << "," + << rd->latency_percentile(0.99) << "," << rd->latency_percentile(0.999) << "," + << rd->latency_percentile(0.9999) << "]"; + } else { + ss << "no stats for key"; + } + + return ss.str(); + } + + static std::shared_ptr make_shared(const std::string& prefix) { + MDRecorder::key_type labels = {"db", "deployment"}; + return std::make_shared(prefix, "deployment", labels); + } + + private: + std::string prefix_; // for reset + // not copyable and can't clear, so use ptr + // MultiDimension can't define recorder window size by yourself, bvar_dump_interval is the only way + std::shared_ptr md_recorder_ GUARDED_BY(mutex_); + mutable absl::Mutex mutex_; // protects collectors_ +}; +} // namespace openmldb::statistics +#endif // SRC_STATISTICS_DEPLOYMENT_METRIC_COLLECTOR_H_ diff --git a/src/statistics/query_response_time/deployment_metric_collector_test.cc b/src/statistics/query_response_time/deployment_metric_collector_test.cc new file mode 100644 index 00000000000..6ca18839a69 --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector_test.cc @@ -0,0 +1,133 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "statistics/query_response_time/deployment_metric_collector.h" + +#include +#include +#include +#include + +#include "absl/random/random.h" +#include "absl/strings/str_cat.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +#include +#include + +namespace bvar { +DECLARE_int32(bvar_dump_interval); +} + +namespace openmldb { +namespace statistics { + +class CollectorTest : public ::testing::Test { + public: + CollectorTest() = default; + ~CollectorTest() override = default; +}; + +using namespace std; +void mem_usage() { + double vm_usage = 0.0; + double resident_set = 0.0; + ifstream stat_stream("/proc/self/stat", ios_base::in); // get info from proc directory + // create some variables to get info + string pid, comm, state, ppid, pgrp, session, tty_nr; + string tpgid, flags, minflt, cminflt, majflt, cmajflt; + string utime, stime, cutime, cstime, priority, nice; + string O, itrealvalue, starttime; + unsigned long vsize; + long rss; + stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt >> + majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> O >> itrealvalue >> starttime >> + vsize >> rss; // don't care about the rest + stat_stream.close(); + long page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // for x86-64 is configured to use 2MB pages + vm_usage = vsize / 1024.0; + resident_set = rss * page_size_kb; + LOG(INFO) << "VM: " << vm_usage << "KB; RSS: " << resident_set << "KB"; +} + +TEST_F(CollectorTest, MemoryTest) { + // let's see how much memory will be used + auto test = [](int db_size, int dpl_size, int request_cnt, bool will_fail = false) { + DeploymentMetricCollector collector("test"); + for (int db_idx = 0; db_idx < db_size; db_idx++) { + auto db = "db" + std::to_string(db_idx); + for (int i = 0; i < dpl_size; i++) { + for (int t = 0; t < request_cnt; t++) { + auto st = collector.Collect(db, "d" + std::to_string(i), absl::Microseconds(10)); + } + } + } + if (will_fail) { + return; + } + auto desc = collector.Desc({}); + LOG(INFO) << desc; + ASSERT_TRUE(desc == + absl::StrCat(R"({"name" : "test_deployment", "labels" : ["db", "deployment"], "stats_count" : )", + db_size * dpl_size, "}")) + << desc; + // peek one + auto dstat = collector.Desc({"db0", "d0"}); + LOG(INFO) << dstat; + // can't check qps, and latency is calc from window, it'll == 0 if get too fase, so we just check count + ASSERT_TRUE(dstat.find("count:" + std::to_string(request_cnt)) != std::string::npos) << dstat; + mem_usage(); + }; + // empty mem VM: 104948KB; RSS: 5752KB + mem_usage(); + // recorder mem for one label is stable, VM: ~105576KB; RSS: ~6512KB, even collect 10M requests + // but VM&RSS containes other memory, should use diff to get recorder mem + test(1, 1, 1000); // + ~0.5M + test(1, 1, 10000); + test(1, 1, 100000); + // disable for speed + // test(1, 1, 1000000); + // test(1, 1, 10000000); + + // VM: 105568KB; RSS: 6628KB + // VM: 105964KB; RSS: 6984KB + // VM: 110056KB; RSS: 11208KB + // VM: 341356KB; RSS: 126256KB + // VM: 344076KB; RSS: 129936KB + // test(1, 10, 1000); + // test(1, 100, 1000); + // test(1, 1000, 1000); + // test(1, 10000, 1000); + // test(10, 1000, 1000); + + // MAX_MULTI_DIMENSION_STATS_COUNT = 20000, so total alive deployment count <= 20001 + // If we need more, need a repetitive task to reset (K*bvar_bump_interval?) or add LabelLatencyRecorder + test(1, 20002, 1, true); +} + +} // namespace statistics +} // namespace openmldb + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + ::google::ParseCommandLineFlags(&argc, &argv, true); + bvar::FLAGS_bvar_dump_interval = 75; + return RUN_ALL_TESTS(); +} diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index f30f1f8b74b..1c51ebbe55c 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -191,7 +191,10 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path, mode_recycle_root_paths_[::openmldb::common::kSSD]); ::openmldb::base::SplitString(FLAGS_recycle_bin_hdd_root_path, ",", mode_recycle_root_paths_[::openmldb::common::kHDD]); - deploy_collector_ = std::make_unique<::openmldb::statistics::DeployQueryTimeCollector>(); + // if want /brpc_metrics, prefix should be g_server_info_prefix+(when no server_info_name), means + // rpc_server_ if standalone, diy + deploy_collector_ = std::make_unique<::openmldb::statistics::DeploymentMetricCollector>( + "rpc_server_" + endpoint.substr(endpoint.find(":") + 1)); if (!zk_cluster.empty()) { zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path); @@ -1643,15 +1646,15 @@ void TabletImpl::Query(RpcController* ctrl, const openmldb::api::QueryRequest* r brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - ProcessQuery(ctrl, request, response, &buf); + ProcessQuery(true, ctrl, request, response, &buf); } -void TabletImpl::ProcessQuery(RpcController* ctrl, const openmldb::api::QueryRequest* request, +void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb::api::QueryRequest* request, ::openmldb::api::QueryResponse* response, butil::IOBuf* buf) { auto start = absl::Now(); - absl::Cleanup deploy_collect_task = [this, request, start]() { + absl::Cleanup deploy_collect_task = [this, is_sub, request, start]() { if (this->IsCollectDeployStatsEnabled()) { - if (request->is_procedure() && request->has_db() && request->has_sp_name()) { + if (!is_sub && request->is_procedure() && request->has_db() && request->has_sp_name()) { this->TryCollectDeployStats(request->db(), request->sp_name(), start); } } @@ -1773,7 +1776,8 @@ void TabletImpl::SubQuery(RpcController* ctrl, const openmldb::api::QueryRequest brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - ProcessQuery(ctrl, request, response, &buf); + // subquery don't need to collect deploy stats + ProcessQuery(true, ctrl, request, response, &buf); } void TabletImpl::SQLBatchRequestQuery(RpcController* ctrl, const openmldb::api::SQLBatchRequestQueryRequest* request, @@ -1782,15 +1786,15 @@ void TabletImpl::SQLBatchRequestQuery(RpcController* ctrl, const openmldb::api:: brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - return ProcessBatchRequestQuery(ctrl, request, response, buf); + return ProcessBatchRequestQuery(false, ctrl, request, response, buf); } -void TabletImpl::ProcessBatchRequestQuery(RpcController* ctrl, +void TabletImpl::ProcessBatchRequestQuery(bool is_sub, RpcController* ctrl, const openmldb::api::SQLBatchRequestQueryRequest* request, openmldb::api::SQLBatchRequestQueryResponse* response, butil::IOBuf& buf) { absl::Time start = absl::Now(); - absl::Cleanup deploy_collect_task = [this, request, start]() { + absl::Cleanup deploy_collect_task = [this, is_sub, request, start]() { if (this->IsCollectDeployStatsEnabled()) { - if (request->is_procedure() && request->has_db() && request->has_sp_name()) { + if (!is_sub && request->is_procedure() && request->has_db() && request->has_sp_name()) { this->TryCollectDeployStats(request->db(), request->sp_name(), start); } } @@ -1962,7 +1966,7 @@ void TabletImpl::SubBatchRequestQuery(RpcController* ctrl, const openmldb::api:: brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - return ProcessBatchRequestQuery(ctrl, request, response, buf); + return ProcessBatchRequestQuery(true, ctrl, request, response, buf); } void TabletImpl::ChangeRole(RpcController* controller, const ::openmldb::api::ChangeRoleRequest* request, @@ -4286,6 +4290,12 @@ void TabletImpl::UpdateGlobalVarTable() { it->Next(); } std::atomic_store_explicit(&global_variables_, new_global_var, std::memory_order_relaxed); + + // no DEPLOY_STATS when init, so we can get the first DEPLOY_STATS value here, and all changes will be handled in + // and we assume that global vars change is low frequency, so we reset here instead of in the repeated task + if (!IsCollectDeployStatsEnabled()) { + deploy_collector_->Reset(); + } return; } @@ -5311,7 +5321,7 @@ void TabletImpl::DropProcedure(RpcController* controller, const ::openmldb::api: if (is_deployment_procedure) { auto collector_key = absl::StrCat(db_name, ".", sp_name); - auto s = deploy_collector_->DeleteDeploy(collector_key); + auto s = deploy_collector_->DeleteDeploy(db_name, sp_name); if (!s.ok()) { LOG(ERROR) << "[ERROR] delete deploy collector: " << s; } else { @@ -5464,31 +5474,12 @@ bool TabletImpl::IsCollectDeployStatsEnabled() const { } // try collect the cost time for a deployment procedure into collector -// if the procedure found in collector, it is colelcted directly -// if not, the function will try find the procedure info from procedure cache, and if turns out is a deployment -// procedure, retry collecting by firstly adding the missing deployment procedure into collector +// if failed, log inside, no extra handle void TabletImpl::TryCollectDeployStats(const std::string& db, const std::string& name, absl::Time start_time) { absl::Time now = absl::Now(); absl::Duration time = now - start_time; - const std::string deploy_name = absl::StrCat(db, ".", name); - auto s = deploy_collector_->Collect(deploy_name, time); - if (absl::IsNotFound(s)) { - // deploy collector is regarded as non-update-to-date cache for sp_info (sp_cache_ should be up-to-date) - // so when Not Found error happens, retry once again by AddDeploy first, with the help of sp_cache_ - auto sp_info = sp_cache_->FindSpProcedureInfo(db, name); - if (sp_info.ok() && sp_info.value()->GetType() == hybridse::sdk::kReqDeployment) { - s = deploy_collector_->AddDeploy(deploy_name); - if (!s.ok()) { - LOG(ERROR) << "[ERROR] add deploy collector: " << s; - return; - } - s = deploy_collector_->Collect(deploy_name, time); - } - } - if (!s.ok()) { - LOG(ERROR) << "[ERROR] collect deploy stat: " << s; - } - DLOG(INFO) << "collected " << deploy_name << " for " << time; + auto st = deploy_collector_->Collect(db, name, time); + DLOG(INFO) << "collect " << db << "." << name << " latency " << time; } void TabletImpl::BulkLoad(RpcController* controller, const ::openmldb::api::BulkLoadRequest* request, @@ -5745,14 +5736,7 @@ void TabletImpl::GetAndFlushDeployStats(::google::protobuf::RpcController* contr ::google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - auto rs = deploy_collector_->Flush(); - for (auto& r : rs) { - auto new_row = response->add_rows(); - new_row->set_deploy_name(r.deploy_name_); - new_row->set_time(r.GetTimeAsStr(statistics::TimeUnit::MICRO_SECOND)); - new_row->set_count(r.count_); - new_row->set_total(r.GetTotalAsStr(statistics::TimeUnit::MICRO_SECOND)); - } + // TODO(hw): delete rpc? response->set_code(ReturnCode::kOk); } diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index d48f192ae26..84ba39fdc56 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -33,9 +33,9 @@ #include "nameserver/system_table.h" #include "proto/tablet.pb.h" #include "replica/log_replicator.h" -#include "storage/aggregator.h" #include "sdk/sql_cluster_router.h" -#include "statistics/query_response_time/deploy_query_response_time.h" +#include "statistics/query_response_time/deployment_metric_collector.h" +#include "storage/aggregator.h" #include "storage/mem_table.h" #include "storage/mem_table_snapshot.h" #include "tablet/bulk_load_mgr.h" @@ -216,10 +216,10 @@ class TabletImpl : public ::openmldb::api::TabletServer { openmldb::api::QueryResponse* response, Closure* done); void CreateFunction(RpcController* controller, const openmldb::api::CreateFunctionRequest* request, - openmldb::api::CreateFunctionResponse* response, Closure* done); + openmldb::api::CreateFunctionResponse* response, Closure* done); void DropFunction(RpcController* controller, const openmldb::api::DropFunctionRequest* request, - openmldb::api::DropFunctionResponse* response, Closure* done); + openmldb::api::DropFunctionResponse* response, Closure* done); void SubQuery(RpcController* controller, const openmldb::api::QueryRequest* request, openmldb::api::QueryResponse* response, Closure* done); @@ -281,9 +281,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { public: explicit UpdateAggrClosure(const std::function& callback) : callback_(callback) {} - void Run() override { - callback_(); - } + void Run() override { callback_(); } private: std::function callback_; @@ -320,23 +318,20 @@ class TabletImpl : public ::openmldb::api::TabletServer { void DumpIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, - uint32_t partition_num, - const std::vector<::openmldb::common::ColumnKey>& column_keys, + uint32_t partition_num, const std::vector<::openmldb::common::ColumnKey>& column_keys, uint64_t offset, std::shared_ptr<::openmldb::api::TaskInfo> task); void SendIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, const std::map& pid_endpoint_map, std::shared_ptr<::openmldb::api::TaskInfo> task); - void LoadIndexDataInternal(uint32_t tid, uint32_t pid, uint32_t cur_pid, - uint32_t partition_num, uint64_t last_time, - std::shared_ptr<::openmldb::api::TaskInfo> task); + void LoadIndexDataInternal(uint32_t tid, uint32_t pid, uint32_t cur_pid, uint32_t partition_num, uint64_t last_time, + std::shared_ptr<::openmldb::api::TaskInfo> task); void ExtractIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, - std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, - const std::vector<::openmldb::common::ColumnKey>& column_key, - uint32_t partition_num, uint64_t offset, bool contain_dump, - std::shared_ptr<::openmldb::api::TaskInfo> task); + std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, + const std::vector<::openmldb::common::ColumnKey>& column_key, uint32_t partition_num, + uint64_t offset, bool contain_dump, std::shared_ptr<::openmldb::api::TaskInfo> task); void SchedMakeSnapshot(); @@ -360,7 +355,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { int LoadTableInternal(uint32_t tid, uint32_t pid, std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); int LoadDiskTableInternal(uint32_t tid, uint32_t pid, const ::openmldb::api::TableMeta& table_meta, - std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); + std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); int WriteTableMeta(const std::string& path, const ::openmldb::api::TableMeta* table_meta); int UpdateTableMeta(const std::string& path, ::openmldb::api::TableMeta* table_meta, bool for_add_column); @@ -373,8 +368,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { void SetTaskStatus(std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, // NOLINT ::openmldb::api::TaskStatus status); - int GetTaskStatus(const std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, - ::openmldb::api::TaskStatus* status); + int GetTaskStatus(const std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, ::openmldb::api::TaskStatus* status); bool IsExistTaskUnLock(const ::openmldb::api::TaskInfo& task); @@ -400,8 +394,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { bool GetTableRootSize(uint32_t tid, uint32_t pid, const ::openmldb::common::StorageMode& mode, uint64_t& size); // NOLINT - int32_t GetSnapshotOffset(uint32_t tid, uint32_t pid, - ::openmldb::common::StorageMode storageMode, + int32_t GetSnapshotOffset(uint32_t tid, uint32_t pid, ::openmldb::common::StorageMode storageMode, std::string& msg, // NOLINT uint64_t& term, uint64_t& offset); // NOLINT @@ -411,9 +404,10 @@ class TabletImpl : public ::openmldb::api::TabletServer { bool GetRealEp(uint64_t tid, uint64_t pid, std::map* real_ep_map); - void ProcessQuery(RpcController* controller, const openmldb::api::QueryRequest* request, + void ProcessQuery(bool is_sub, RpcController* controller, const openmldb::api::QueryRequest* request, ::openmldb::api::QueryResponse* response, butil::IOBuf* buf); - void ProcessBatchRequestQuery(RpcController* controller, const openmldb::api::SQLBatchRequestQueryRequest* request, + void ProcessBatchRequestQuery(bool is_sub, RpcController* controller, + const openmldb::api::SQLBatchRequestQueryRequest* request, openmldb::api::SQLBatchRequestQueryResponse* response, butil::IOBuf& buf); // NOLINT @@ -421,11 +415,9 @@ class TabletImpl : public ::openmldb::api::TabletServer { const ::openmldb::storage::Dimensions& dimensions, uint64_t log_offset); bool CreateAggregatorInternal(const ::openmldb::api::CreateAggregatorRequest* request, - std::string& msg); //NOLINT + std::string& msg); // NOLINT - inline bool IsClusterMode() const { - return startup_mode_ == ::openmldb::type::StartupMode::kCluster; - } + inline bool IsClusterMode() const { return startup_mode_ == ::openmldb::type::StartupMode::kCluster; } std::string GetDBPath(const std::string& root_path, uint32_t tid, uint32_t pid); @@ -460,10 +452,8 @@ class TabletImpl : public ::openmldb::api::TabletServer { std::set sync_snapshot_set_; std::map> file_receiver_map_; BulkLoadMgr bulk_load_mgr_; - std::map<::openmldb::common::StorageMode, std::vector> - mode_root_paths_; - std::map<::openmldb::common::StorageMode, std::vector> - mode_recycle_root_paths_; + std::map<::openmldb::common::StorageMode, std::vector> mode_root_paths_; + std::map<::openmldb::common::StorageMode, std::vector> mode_recycle_root_paths_; std::atomic follower_; std::shared_ptr> real_ep_map_; // thread safe @@ -482,7 +472,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { std::shared_ptr> global_variables_; - std::unique_ptr deploy_collector_; + std::unique_ptr deploy_collector_; std::atomic memory_used_ = 0; };