From a966e66bb1911e798dd70736de703f06c558140b Mon Sep 17 00:00:00 2001 From: HuangWei Date: Thu, 16 Nov 2023 11:55:43 +0800 Subject: [PATCH] feat: p99 metric for deployment and apiserver (#3588) --- docs/zh/maintain/monitoring.md | 32 ++- release/conf/apiserver.flags.template | 2 + release/conf/tablet.flags.template | 2 + src/apiserver/api_server_impl.cc | 22 +- src/apiserver/api_server_impl.h | 18 +- src/apiserver/api_server_test.cc | 5 +- src/apiserver/interface_provider.cc | 7 + src/apiserver/interface_provider.h | 6 +- src/client/tablet_client.cc | 2 +- src/cmd/openmldb.cc | 2 +- src/cmd/sql_cmd_test.cc | 113 +--------- src/nameserver/name_server_impl.cc | 193 ------------------ src/nameserver/name_server_impl.h | 5 - src/nameserver/name_server_test.cc | 2 - .../query_response_time/CMakeLists.txt | 7 +- .../deployment_metric_collector.cc | 43 ++++ .../deployment_metric_collector.h | 81 ++++++++ .../deployment_metric_collector_test.cc | 134 ++++++++++++ src/tablet/tablet_impl.cc | 68 +++--- src/tablet/tablet_impl.h | 54 ++--- 20 files changed, 385 insertions(+), 413 deletions(-) create mode 100644 src/statistics/query_response_time/deployment_metric_collector.cc create mode 100644 src/statistics/query_response_time/deployment_metric_collector.h create mode 100644 src/statistics/query_response_time/deployment_metric_collector_test.cc diff --git a/docs/zh/maintain/monitoring.md b/docs/zh/maintain/monitoring.md index 905644c74df..e51f0a3b8bc 100644 --- a/docs/zh/maintain/monitoring.md +++ b/docs/zh/maintain/monitoring.md @@ -31,10 +31,8 @@ OpenMLDB exporter 是以 Python 实现的 Prometheus exporter,核心是通过 2. 启动 OpenMLDB - 参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, 或者确认启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`。 + 参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, OpenMLDB启动脚本(无论是sbin或bin)都已配置为true,如果你使用个人方式启动,需要保证启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`。 - 默认启动脚本 `bin/start.sh` 开启了 server status, 不需要额外配置。 - 3. 注意:合理选择 OpenMLDB 各组件和 OpenMLDB exporter, 以及 Prometheus, Grafana 的绑定 IP 地址,确保 Grafana 可以访问到 Prometheus, 并且 Prometheus,OpenMLDB exporter 和 OpenMLDB 各个组件之间可以相互访问。 ### 部署 OpenMLDB exporter @@ -168,13 +166,6 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope - component status: 集群组件状态 - table status: 数据库表相关信息,如 `rows_count`, `memory_bytes` - - deploy query response time: deployment query 在 tablet 内部的运行时间 - - **除了 deploy query response time 指标外, 成功配置监控之后都可以直接查询到指标. Deploy query response time 需要全局变量 `deploy_stats` 开启后才会有数据, 在 OpenMLDB CLI 中输入 SQL:** - - ```sql - SET GLOBAL deploy_stats = 'on'; - ``` 你可以通过 @@ -184,9 +175,27 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope 查看完整 DB-Level 指标和帮助信息。 +通过Component-Level 指标通过Grafana聚合的DB-Level 指标(未单独声明时,time单位为us): + +- deploy query response time: deployment query 在OpenMLDB内部的运行时间,按DB.DEPLOYMENT汇总 + **需要全局变量 `deploy_stats` 开启后才会开始统计, 在 OpenMLDB CLI 中输入 SQL:** + + ```sql + SET GLOBAL deploy_stats = 'on'; + ``` + 然后,还需要执行deplpoyment,才会出现相应的指标。 + 如果SET变量为off,会清空server中的所有deployment指标并停止统计(已被Prometheus抓取的数据不影响)。 + - count:count类统计值从deploy_stats on时开始统计,不区分请求的成功和失败。 + - latency, qps:这类指标只统计`[current_time - interval, current_time]`时间窗口内的数据,interval由Tablet Server配置项`bvar_dump_interval`配置,默认为75秒。 + +- api server http time: 各API接口的处理耗时(不包含route),只监测接口耗时,不做细粒度区分,目前也不通过Grafana展示,可以通过Prometheus手动查询。目前监测`deployment`、`sp`和`query`三种方法。 + - api server route time: APIServer进行http route的耗时,通常为us级别,一般忽略不计 + +以上聚合指标的获取方式见下文。在组件指标中,deploy query response time关键字为`deployment`,api server http time关键字为`http_method`。如果指标展示不正常,可以查询组件指标定位问题。 + ### 2. Component-Level 指标 -OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/incubator-brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项: +OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项: ```yaml - job_name: openmldb_components @@ -203,6 +212,7 @@ OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC serve - BRPC server 进程相关信息 - 对应 BRPC server 定义的 RPC method 相关指标,例如该 RPC 的请求 `count`, `error_count`, `qps` 和 `response_time` + - Deployment 相关指标,分deployment统计,但只统计该tablet上的deployment请求。它们将通过Grafana聚合,形成最终的的集群级别Deployment指标。 通过 diff --git a/release/conf/apiserver.flags.template b/release/conf/apiserver.flags.template index 5429b305c3a..df0735c0fb5 100644 --- a/release/conf/apiserver.flags.template +++ b/release/conf/apiserver.flags.template @@ -9,3 +9,5 @@ --log_level=info #--thread_pool_size=16 +--bvar_max_dump_multi_dimension_metric_number=10 +--bvar_dump_interval=75 \ No newline at end of file diff --git a/release/conf/tablet.flags.template b/release/conf/tablet.flags.template index 29e0bd7d374..d5109a9abaf 100644 --- a/release/conf/tablet.flags.template +++ b/release/conf/tablet.flags.template @@ -99,3 +99,5 @@ # turn this option on to export openmldb metric status # --enable_status_service=false +--bvar_max_dump_multi_dimension_metric_number=10 +--bvar_dump_interval=75 diff --git a/src/apiserver/api_server_impl.cc b/src/apiserver/api_server_impl.cc index acd6ce24517..cb13414798f 100644 --- a/src/apiserver/api_server_impl.cc +++ b/src/apiserver/api_server_impl.cc @@ -22,11 +22,18 @@ #include #include "apiserver/interface_provider.h" + +#include "absl/cleanup/cleanup.h" #include "brpc/server.h" +#include "butil/time.h" namespace openmldb { namespace apiserver { +APIServerImpl::APIServerImpl(const std::string& endpoint) + : md_recorder_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1), "http_method", {"method"}), + provider_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1)) {} + APIServerImpl::~APIServerImpl() = default; bool APIServerImpl::Init(const sdk::ClusterOptions& options) { @@ -72,7 +79,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); auto* cntl = dynamic_cast(cntl_base); - // The unresolved path has no slashes at the beginning(guaranteed by brpc), it's not good for url parsing auto unresolved_path = "/" + cntl->http_request().unresolved_path(); auto method = cntl->http_request().method(); @@ -81,7 +87,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht JsonWriter writer; provider_.handle(unresolved_path, method, req_body, writer); - cntl->response_attachment().append(writer.GetString()); } @@ -110,6 +115,12 @@ std::map mode_map{ void APIServerImpl::RegisterQuery() { provider_.post("/dbs/:db_name", [this](const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer) { + auto start = absl::Now(); + absl::Cleanup method_latency = [this, start]() { + // TODO(hw): query should split into async/sync, online/offline? + absl::Duration time = absl::Now() - start; + *md_recorder_.get_stats({"query"}) << absl::ToInt64Microseconds(time); + }; auto resp = GeneralResp(); auto db_it = param.find("db_name"); if (db_it == param.end()) { @@ -303,7 +314,7 @@ absl::Status APIServerImpl::JsonMap2SQLRequestRow(const Value& non_common_cols_v if (sch->GetColumnType(i) == hybridse::sdk::kTypeString) { auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str()); if (v == non_common_cols_v.MemberEnd()) { - return absl::InvalidArgumentError("can't find " + sch->GetColumnName(i)); + return absl::InvalidArgumentError("can't find col " + sch->GetColumnName(i)); } str_len_sum += v->value.GetStringLength(); } @@ -421,6 +432,11 @@ void APIServerImpl::RegisterExecSP() { void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer) { + auto start = absl::Now(); + absl::Cleanup method_latency = [this, start, has_common_col]() { + absl::Duration time = absl::Now() - start; + *md_recorder_.get_stats({has_common_col ? "sp" : "deployment"}) << absl::ToInt64Microseconds(time); + }; auto resp = GeneralResp(); auto db_it = param.find("db_name"); auto sp_it = param.find("sp_name"); diff --git a/src/apiserver/api_server_impl.h b/src/apiserver/api_server_impl.h index f2b9741cb07..ee41e34935b 100644 --- a/src/apiserver/api_server_impl.h +++ b/src/apiserver/api_server_impl.h @@ -32,6 +32,10 @@ #include "sdk/sql_cluster_router.h" #include "sdk/sql_request_row.h" +#include "absl/status/status.h" +#include "bvar/bvar.h" +#include "bvar/multi_dimension.h" // latency recorder + namespace openmldb { namespace apiserver { @@ -45,7 +49,7 @@ using rapidjson::Value; // Both input and output are json data. We use rapidjson to handle it. class APIServerImpl : public APIServer { public: - APIServerImpl() = default; + explicit APIServerImpl(const std::string& endpoint); ~APIServerImpl() override; bool Init(const sdk::ClusterOptions& options); bool Init(::openmldb::sdk::DBSDK* cluster); @@ -82,13 +86,19 @@ class APIServerImpl : public APIServer { // may get segmentation fault when throw boost::bad_lexical_cast, so we use std::from_chars template static bool FromString(const std::string& s, T& value) { // NOLINT - auto res = std::from_chars(s.data(), s.data() + s.size(), value); - return res.ec == std::errc() && (res.ptr - s.data() == s.size()); + if (auto res = std::from_chars(s.data(), s.data() + s.size(), value); res.ec == std::errc()) { + auto len = res.ptr - s.data(); + return len >= 0 ? (uint64_t)len == s.size() : false; + } else { + return false; + } } private: - std::shared_ptr sql_router_; + bvar::MultiDimension md_recorder_; InterfaceProvider provider_; + + std::shared_ptr sql_router_; // cluster_sdk_ is not owned by this class. ::openmldb::sdk::DBSDK* cluster_sdk_ = nullptr; }; diff --git a/src/apiserver/api_server_test.cc b/src/apiserver/api_server_test.cc index f327ff89527..6abe8ddd051 100644 --- a/src/apiserver/api_server_test.cc +++ b/src/apiserver/api_server_test.cc @@ -50,7 +50,7 @@ class APIServerTestEnv : public testing::Environment { // Owned by queue_svc cluster_sdk = new ::openmldb::sdk::ClusterSDK(cluster_options); ASSERT_TRUE(cluster_sdk->Init()) << "Fail to connect to db"; - queue_svc = std::make_shared(); + queue_svc = std::make_shared("127.0.0.1:8010"); // fake endpoint for metrics ASSERT_TRUE(queue_svc->Init(cluster_sdk)); sdk::SQLRouterOptions sql_opt; @@ -1236,7 +1236,8 @@ TEST_F(APIServerTest, jsonInput) { ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText(); LOG(INFO) << "get sp resp: " << show_cntl.response_attachment(); - // call deployment in json style input(won't check if it's a sp or deployment) + // call sp in deployment api with json style input(won't check if it's a sp or deployment), so it'll have field + // `common_cols_data` { brpc::Controller cntl; cntl.http_request().set_method(brpc::HTTP_METHOD_POST); diff --git a/src/apiserver/interface_provider.cc b/src/apiserver/interface_provider.cc index c4be99c4726..4a482b3aab0 100644 --- a/src/apiserver/interface_provider.cc +++ b/src/apiserver/interface_provider.cc @@ -25,6 +25,8 @@ #include #include "boost/algorithm/string/split.hpp" +#include "butil/time.h" +#include "bvar/bvar.h" #include "glog/logging.h" namespace openmldb { @@ -159,6 +161,8 @@ void InterfaceProvider::registerRequest(brpc::HttpMethod type, std::string const bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& method, const butil::IOBuf& req_body, JsonWriter& writer) { + butil::Timer tm; + tm.start(); auto err = GeneralResp(); Url url; @@ -190,6 +194,9 @@ bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& } auto params = extractParameters(url, request->url); + tm.stop(); + route_recorder_ << tm.u_elapsed(); + request->callback(params, req_body, writer); return true; } diff --git a/src/apiserver/interface_provider.h b/src/apiserver/interface_provider.h index e2ffe4661a4..8589b9229c5 100644 --- a/src/apiserver/interface_provider.h +++ b/src/apiserver/interface_provider.h @@ -32,6 +32,7 @@ #include "apiserver/json_helper.h" #include "brpc/http_method.h" // HttpMethod #include "butil/iobuf.h" // IOBuf +#include "bvar/bvar.h" // latency recorder #include "proto/api_server.pb.h" namespace openmldb { @@ -111,7 +112,7 @@ class ReducedUrlParser { class InterfaceProvider { public: - InterfaceProvider() = default; + explicit InterfaceProvider(const std::string& metric_prefix) : route_recorder_(metric_prefix, "http_route") {} InterfaceProvider& operator=(InterfaceProvider const&) = delete; InterfaceProvider(InterfaceProvider const&) = delete; @@ -160,6 +161,9 @@ class InterfaceProvider { void registerRequest(brpc::HttpMethod, const std::string& path, std::function&& callback); private: + // we only record route latency, method latency is recorded in callback(you may need record in parts), defined in + // api server impl + bvar::LatencyRecorder route_recorder_; std::unordered_map> requests_; }; diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 3a51a7e8f94..2049279d17f 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -1171,7 +1171,7 @@ bool TabletClient::SubBatchRequestQuery(const ::openmldb::api::SQLBatchRequestQu if (callback == nullptr) { return false; } - return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SQLBatchRequestQuery, + return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SubBatchRequestQuery, callback->GetController().get(), &request, callback->GetResponse().get(), callback); } diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index 8c07cb252da..328f4ff342b 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -3853,7 +3853,7 @@ void StartAPIServer() { GetRealEndpoint(&real_endpoint); } - auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>(); + auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>(real_endpoint); if (!FLAGS_nameserver.empty()) { std::vector vec; boost::split(vec, FLAGS_nameserver, boost::is_any_of(":")); diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index a1637e369d9..459aef2971e 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -3777,7 +3777,7 @@ TEST_F(SqlCmdTest, SelectWithAddNewIndex) { // -------------------------------------------------------------------------------------- // basic functional UTs to test if it is correct for deploy query response time collection -// see NameServerImpl::SyncDeployStats & TabletImpl::TryCollectDeployStats +// see TabletImpl::CollectDeployStats // -------------------------------------------------------------------------------------- // a proxy class to create and cleanup deployment stats more gracefully @@ -3850,12 +3850,6 @@ struct DeploymentEnv { ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace; } - void EnableDeployStats() { - ProcessSQLs(sr_, { - "set global deploy_stats = 'on'", - }); - } - sdk::SQLClusterRouter* sr_; absl::BitGen gen_; // variables generate randomly in SetUp @@ -3882,111 +3876,6 @@ struct DeploymentEnv { } }; -static const char QueryDeployResponseTime[] = "select * from INFORMATION_SCHEMA.DEPLOY_RESPONSE_TIME"; - -TEST_P(DBSDKTest, DeployStatsNotEnableByDefault) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - - DeploymentEnv env(sr); - env.SetUp(); - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - - absl::SleepFor(absl::Seconds(3)); - - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(0, rs->Size()); - - env.EnableDeployStats(); - - absl::SleepFor(absl::Seconds(3)); - - // HandleSQL exists only for purpose of printing - HandleSQL(QueryDeployResponseTime); - rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(0, rs->Size()); -} - -TEST_P(DBSDKTest, DeployStatsEnabledAfterSetGlobal) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - - // FIXME(#1547): test skiped due to Deploy Response Time can't enable in standalone mode - if (cs->IsClusterMode()) { - DeploymentEnv env(sr); - env.SetUp(); - env.EnableDeployStats(); - // sleep a while for global variable notification - absl::SleepFor(absl::Seconds(2)); - - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - // as deploy stats in tablet is lazy managed, the deploy stats will stay empty util the first procedure call - // happens - ASSERT_EQ(0, rs->Size()); - - // warm up deploy stats - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - - absl::SleepFor(absl::Seconds(3)); - - HandleSQL(QueryDeployResponseTime); - rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(TIME_DISTRIBUTION_BUCKET_COUNT, rs->Size()); - - int cnt = 0; - while (rs->Next()) { - EXPECT_EQ(absl::StrCat(env.db_, ".", env.dp_name_), rs->GetAsStringUnsafe(0)); - cnt += rs->GetInt32Unsafe(2); - } - EXPECT_EQ(2, cnt); - } -} - -TEST_P(DBSDKTest, DeployStatsOnlyCollectDeployProcedure) { - auto cli = GetParam(); - cs = cli->cs; - sr = cli->sr; - if (cs->IsClusterMode()) { - DeploymentEnv env(sr); - env.SetUp(); - - env.EnableDeployStats(); - absl::SleepFor(absl::Seconds(2)); - - for (int i = 0; i < 5; ++i) { - env.CallProcedure(); - } - - for (int i = 0; i < 10; ++i) { - env.CallDeployProcedureBatch(); - env.CallDeployProcedure(); - } - absl::SleepFor(absl::Seconds(3)); - - HandleSQL(QueryDeployResponseTime); - hybridse::sdk::Status status; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployResponseTime, {}, &status); - ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(TIME_DISTRIBUTION_BUCKET_COUNT, rs->Size()); - int cnt = 0; - while (rs->Next()) { - EXPECT_EQ(absl::StrCat(env.db_, ".", env.dp_name_), rs->GetAsStringUnsafe(0)); - cnt += rs->GetInt32Unsafe(2); - } - EXPECT_EQ(10 + 10, cnt); - } -} - class StripSpaceTest : public ::testing::TestWithParam> {}; std::vector> strip_cases = { diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 35d11f4d6ec..06912ad9736 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -74,7 +74,6 @@ DECLARE_int32(make_snapshot_time); DECLARE_int32(make_snapshot_check_interval); DECLARE_bool(use_name); DECLARE_bool(enable_distsql); -DECLARE_uint32(sync_deploy_stats_timeout); namespace openmldb { namespace nameserver { @@ -1473,8 +1472,6 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p task_vec_.resize(FLAGS_name_server_task_max_concurrency + FLAGS_name_server_task_concurrency_for_replica_cluster); task_thread_pool_.DelayTask(FLAGS_make_snapshot_check_interval, boost::bind(&NameServerImpl::SchedMakeSnapshot, this)); - task_thread_pool_.DelayTask(FLAGS_sync_deploy_stats_timeout, - boost::bind(&NameServerImpl::ScheduleSyncDeployStats, this)); return true; } @@ -9394,21 +9391,6 @@ void NameServerImpl::DropProcedureOnTablet(const std::string& db_name, const std } } - bool is_deployment_procedure = false; - { - std::lock_guard lock(mu_); - auto it = db_sp_info_map_.find(db_name); - if (it != db_sp_info_map_.end()) { - auto iit = it->second.find(sp_name); - is_deployment_procedure = iit != it->second.end() && iit->second->type() == type::kReqDeployment; - } - } - std::shared_ptr info; - auto success = GetTableInfo(DEPLOY_RESPONSE_TIME, INFORMATION_SCHEMA_DB, &info); - // NOTE: deploy stats records will delete even when global setting deploy_stats is turned off while there are - // records for previous deploy query (during deploy_stats = 'on') - bool drop_deploy_stats = is_deployment_procedure && success && info != nullptr; - for (auto tb_client : tb_client_vec) { if (!tb_client->DropProcedure(db_name, sp_name)) { PDLOG(WARNING, "drop procedure on tablet failed. db_name[%s], sp_name[%s], endpoint[%s]", db_name.c_str(), @@ -9419,44 +9401,6 @@ void NameServerImpl::DropProcedureOnTablet(const std::string& db_name, const std PDLOG(INFO, "drop procedure on tablet success. db_name[%s], sp_name[%s], endpoint[%s]", db_name.c_str(), sp_name.c_str(), tb_client->GetEndpoint().c_str()); } - - if (drop_deploy_stats && info->table_partition_size() > 0) { - std::string endpoint; - for (auto& meta : info->table_partition()[0].partition_meta()) { - if (meta.is_leader()) { - endpoint = meta.endpoint(); - } - } - auto tablet_info = GetTablet(endpoint); - if (tablet_info == nullptr) { - PDLOG(ERROR, "no leader exists for system table %s", DEPLOY_RESPONSE_TIME); - return; - } - - auto tb_client = tablet_info->client_; - - auto deploy_name = absl::StrCat(db_name, ".", sp_name); - uint32_t pid = static_cast(::openmldb::base::hash64(deploy_name) % info->table_partition_size()); - auto time = absl::Microseconds(1); - int cnt = 0; - std::string msg; - while (cnt++ < TIME_DISTRIBUTION_BUCKET_COUNT - 1) { - auto key = absl::StrCat(deploy_name, "|", statistics::GetDurationAsStr(time, statistics::TimeUnit::SECOND)); - if (!tb_client->Delete(info->tid(), pid, key, "", msg)) { - // NOTE: some warning appears but is expected, just ingore: - // 1. when you create a deploy query but not call it any time before delete it - // 2. deploy_stats is always turned off - PDLOG(WARNING, "failed to delete entry in %s in tablet %s where key = %s : %s", DEPLOY_RESPONSE_TIME, - tb_client->GetEndpoint(), key, msg); - } - time *= 10; - } - auto key = absl::StrCat(deploy_name, "|", MAX_STRING); - if (!tb_client->Delete(info->tid(), pid, key, "", msg)) { - PDLOG(WARNING, "failed to delete entry in %s in tablet %s where key = %s : %s", DEPLOY_RESPONSE_TIME, - tb_client->GetEndpoint(), key, msg); - } - } } void NameServerImpl::DropProcedure(RpcController* controller, const api::DropProcedureRequest* request, @@ -9974,143 +9918,6 @@ base::Status NameServerImpl::InitGlobalVarTable() { return {}; } -static const std::string& QueryDeployStats() { - static const std::string query_deploy_stats = - absl::StrCat("select * from ", nameserver::INFORMATION_SCHEMA_DB, ".", nameserver::DEPLOY_RESPONSE_TIME); - return query_deploy_stats; -} - -static const std::string& QueryDeployStatsIsOn() { - static const std::string query_deploy_stats_is_on = - absl::StrCat("select * from ", nameserver::INFORMATION_SCHEMA_DB, ".", nameserver::GLOBAL_VARIABLES, - " where Variable_name = 'deploy_stats'"); - return query_deploy_stats_is_on; -} - -void NameServerImpl::SyncDeployStats() { - // Step one: check condition for deploy stats. only all of those meet: - // 1. current ns is master - // 2. deploy_stats global variable is set to 'on' or 'true' - if (startup_mode_ == type::kStandalone && running_.load(std::memory_order_acquire)) { - DLOG(INFO) << "sync deploy stats skipped for non-leader ns on cluster"; - FreeSdkConnection(); - return; - } - - if (!GetSdkConnection()) { - LOG(ERROR) << "failed to get sdk connection"; - return; - } - auto sr = std::atomic_load_explicit(&sr_, std::memory_order_acquire); - if (sr == nullptr) { - LOG(ERROR) << "sdk connection is null"; - return; - } - ::hybridse::sdk::Status s; - auto rs = sr->ExecuteSQLParameterized("", QueryDeployStatsIsOn(), {}, &s); - if (!s.IsOK()) { - LOG(ERROR) << "[ERROR] query global variable deploy_stats: " << s.msg; - return; - } - - bool sync_stats = false; - while (rs->Next()) { - auto val = rs->GetStringUnsafe(1); - sync_stats = (val == "on" || val == "true"); - } - - if (!sync_stats) { - DLOG(INFO) << "sync deploy stats skipped when deploy_stats is off"; - return; - } - - // Step two: Fetch And Flush deploy stats from each tablet - std::unordered_map> active_tablets; - { - std::lock_guard lock(mu_); - for (auto& kv : tablets_) { - if (kv.second->Health()) { - active_tablets.emplace(kv.first, kv.second->client_); - } - } - } - statistics::DeployResponseTimeRowReducer reducer; - for (auto& client : active_tablets) { - ::openmldb::api::DeployStatsResponse res; - if (!client.second->GetAndFlushDeployStats(&res)) { - LOG(ERROR) << "GetAndFlushDeployStats from " << client.first << " failed "; - continue; - } - - for (auto& r : res.rows()) { - reducer.Reduce(r.deploy_name(), - statistics::ParseDurationFromStr(r.time(), statistics::TimeUnit::MICRO_SECOND), r.count(), - statistics::ParseDurationFromStr(r.total(), statistics::TimeUnit::MICRO_SECOND)); - } - } - - // Step three: Query old deploy response time from table - rs = sr->ExecuteSQLParameterized("", QueryDeployStats(), {}, &s); - if (!s.IsOK()) { - LOG(ERROR) << "[ERROR] querying DEPLOY_RESPONSE_TIME" << s.msg; - return; - } - - // old reducer help find those rows already in table and but there is - // no new incremental stats reduced from all tablets - statistics::DeployResponseTimeRowReducer old_reducer; - while (rs->Next()) { - auto name = rs->GetAsStringUnsafe(0); - auto time = rs->GetAsStringUnsafe(1); - uint64_t cnt = 0; - if (rs->GetSchema()->GetColumnType(2) == hybridse::sdk::DataType::kTypeInt64) { - cnt = static_cast(rs->GetInt64Unsafe(2)); - } else { - cnt = static_cast(rs->GetInt32Unsafe(2)); - } - auto total = rs->GetAsStringUnsafe(3); - - auto ts = statistics::ParseDurationFromStr(time, statistics::TimeUnit::SECOND); - auto tt = statistics::ParseDurationFromStr(total, statistics::TimeUnit::SECOND); - - reducer.Reduce(name, ts, cnt, tt); - old_reducer.Reduce(name, ts, cnt, tt); - } - - // Step four: update DEPLOY_RESPONSE_TIME table by the new rows - // only for rows that meet any of conditiions below: - // 1. the incremental count and total is bigger than 0 - // 2. original table do not have row for that (deploy_name + time) key yet - std::string insert_deploy_stat = absl::StrCat("insert into ", nameserver::INFORMATION_SCHEMA_DB, ".", - nameserver::DEPLOY_RESPONSE_TIME, " values "); - for (auto& row : reducer.Rows()) { - auto old_it = old_reducer.Find(row->deploy_name_, row->time_); - if (old_it != nullptr && row->count_ == old_it->count_) { - // don't update table only if there is no incremental data, and there is record already in table - continue; - } - - std::string time = row->GetTimeAsStr(statistics::TimeUnit::SECOND); - auto insert_sql = absl::StrCat(insert_deploy_stat, " ( '", row->deploy_name_, "', '", - time, "', ", row->count_, - ",'", row->GetTotalAsStr(statistics::TimeUnit::SECOND), "' )"); - - hybridse::sdk::Status st; - DLOG(INFO) << "sync deploy stats: executing sql: " << insert_sql; - sr->ExecuteInsert("", insert_sql, &st); - if (!st.IsOK()) { - LOG(ERROR) << "[ERROR] insert deploy stats failed: " << st.msg; - } - } - // TODO(ace): add logs for summary how many rows affected and time cost -} - -void NameServerImpl::ScheduleSyncDeployStats() { - SyncDeployStats(); - task_thread_pool_.DelayTask(FLAGS_sync_deploy_stats_timeout, - boost::bind(&NameServerImpl::ScheduleSyncDeployStats, this)); -} - /// \beirf create a SQLClusterRouter instance for use like monitoring statistics collecting /// the actual instance is stored in `sr_` member /// diff --git a/src/nameserver/name_server_impl.h b/src/nameserver/name_server_impl.h index 593c0bb536f..c8f5c56b04d 100644 --- a/src/nameserver/name_server_impl.h +++ b/src/nameserver/name_server_impl.h @@ -673,11 +673,6 @@ class NameServerImpl : public NameServer { uint64_t GetTerm() const; - // write deploy statistics into table - void SyncDeployStats(); - - void ScheduleSyncDeployStats(); - bool GetSdkConnection(); void FreeSdkConnection(); diff --git a/src/nameserver/name_server_test.cc b/src/nameserver/name_server_test.cc index eee5d79f351..e01f2f5c792 100644 --- a/src/nameserver/name_server_test.cc +++ b/src/nameserver/name_server_test.cc @@ -46,7 +46,6 @@ DECLARE_int32(zk_keep_alive_check_interval); DECLARE_int32(make_snapshot_threshold_offset); DECLARE_uint32(name_server_task_max_concurrency); DECLARE_uint32(system_table_replica_num); -DECLARE_uint32(sync_deploy_stats_timeout); DECLARE_bool(auto_failover); using brpc::Server; @@ -1297,6 +1296,5 @@ int main(int argc, char** argv) { FLAGS_ssd_root_path = tmp_path.GetTempPath("ssd"); FLAGS_hdd_root_path = tmp_path.GetTempPath("hdd"); FLAGS_system_table_replica_num = 0; - FLAGS_sync_deploy_stats_timeout = 1000000; return RUN_ALL_TESTS(); } diff --git a/src/statistics/query_response_time/CMakeLists.txt b/src/statistics/query_response_time/CMakeLists.txt index e309934f318..b03aeef65c5 100644 --- a/src/statistics/query_response_time/CMakeLists.txt +++ b/src/statistics/query_response_time/CMakeLists.txt @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -set(LINK_LIBS absl::time absl::random_random absl::strings absl::status absl::statusor absl::synchronization ${GTEST_LIBRARIES} ${GLOG_LIBRARY} ${GFLAGS_LIBRARY}) +set(LINK_LIBS absl::time absl::random_random absl::strings absl::status absl::statusor absl::synchronization ${BRPC_LIBS} ${GTEST_LIBRARIES} ${GLOG_LIBRARY} ${GFLAGS_LIBRARY}) link_libraries(${LINK_LIBS}) if(CMAKE_CXX_COMPILER_ID MATCHES "(AppleClang)|(Clang)") add_definitions(-Wthread-safety) endif() -add_library(query_response_time STATIC ${CMAKE_CURRENT_SOURCE_DIR}/deploy_query_response_time.cc ${CMAKE_CURRENT_SOURCE_DIR}/query_response_time.cc) +add_library(query_response_time STATIC ${CMAKE_CURRENT_SOURCE_DIR}/deployment_metric_collector.cc) function(add_test_file TARGET_NAME SOURCE_NAME) add_executable(${TARGET_NAME} ${SOURCE_NAME}) @@ -40,8 +40,7 @@ function(add_test_file TARGET_NAME SOURCE_NAME) endfunction(add_test_file) if(TESTING_ENABLE) - add_test_file(query_response_time_test ${CMAKE_CURRENT_SOURCE_DIR}/query_response_time_test.cc) - add_test_file(deploy_query_response_time_test ${CMAKE_CURRENT_SOURCE_DIR}/deploy_query_response_time_test.cc) + add_test_file(deployment_metric_collector_test ${CMAKE_CURRENT_SOURCE_DIR}/deployment_metric_collector_test.cc) if(CMAKE_PROJECT_NAME STREQUAL "openmldb") set(test_list ${test_list} PARENT_SCOPE) diff --git a/src/statistics/query_response_time/deployment_metric_collector.cc b/src/statistics/query_response_time/deployment_metric_collector.cc new file mode 100644 index 00000000000..252821254b8 --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector.cc @@ -0,0 +1,43 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "statistics/query_response_time/deployment_metric_collector.h" + +namespace openmldb::statistics { + +absl::Status DeploymentMetricCollector::Collect(const std::string& db, const std::string& deploy_name, + absl::Duration time) { + absl::ReaderMutexLock lock(&mutex_); + auto it = md_recorder_->get_stats({db, deploy_name}); + if (it == nullptr) { + LOG(WARNING) << "reach limit size, collect failed"; + return absl::OutOfRangeError("multi-dimensional recorder reaches limit size, please delete old deploy"); + } + *it << absl::ToInt64Microseconds(time); + return absl::OkStatus(); +} + +absl::Status DeploymentMetricCollector::DeleteDeploy(const std::string& db, const std::string& deploy_name) { + absl::ReaderMutexLock lock(&mutex_); + md_recorder_->delete_stats({db, deploy_name}); + return absl::OkStatus(); +} + +void DeploymentMetricCollector::Reset() { + absl::WriterMutexLock lock(&mutex_); + md_recorder_ = make_shared(prefix_); +} +} // namespace openmldb::statistics diff --git a/src/statistics/query_response_time/deployment_metric_collector.h b/src/statistics/query_response_time/deployment_metric_collector.h new file mode 100644 index 00000000000..9296b5a5fde --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector.h @@ -0,0 +1,81 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_STATISTICS_QUERY_RESPONSE_TIME_DEPLOYMENT_METRIC_COLLECTOR_H_ +#define SRC_STATISTICS_QUERY_RESPONSE_TIME_DEPLOYMENT_METRIC_COLLECTOR_H_ + +#include +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/time.h" +#include "bvar/bvar.h" +#include "bvar/multi_dimension.h" +#include "gflags/gflags.h" + +namespace openmldb::statistics { +class DeploymentMetricCollector { + public: + typedef typename bvar::MultiDimension MDRecorder; + explicit DeploymentMetricCollector(const std::string& prefix) : prefix_(prefix), md_recorder_(make_shared(prefix)) { + // already expose_as when MultiDimension ctor + } + // collector is not copyable + DeploymentMetricCollector(const DeploymentMetricCollector& c) = delete; + + ~DeploymentMetricCollector() {} + // . + absl::Status Collect(const std::string& db, const std::string& deploy_name, absl::Duration time) + LOCKS_EXCLUDED(mutex_); + absl::Status DeleteDeploy(const std::string& db, const std::string& deploy_name) LOCKS_EXCLUDED(mutex_); + void Reset() LOCKS_EXCLUDED(mutex_); + + // usually used for debug + std::string Desc(const std::list& key) LOCKS_EXCLUDED(mutex_) { + absl::ReaderMutexLock lock(&mutex_); + std::stringstream ss; + if (key.empty()) { + md_recorder_->describe(ss); + } else if (md_recorder_->has_stats(key)) { + auto rd = md_recorder_->get_stats(key); + ss << "count:" << rd->count() << ", qps:" << rd->qps() << ", latency:[" << rd->latency() << "," + << rd->latency_percentile(0.8) << "," << rd->latency_percentile(0.9) << "," + << rd->latency_percentile(0.99) << "," << rd->latency_percentile(0.999) << "," + << rd->latency_percentile(0.9999) << "]"; + } else { + ss << "no stats for key"; + } + + return ss.str(); + } + + static std::shared_ptr make_shared(const std::string& prefix) { + MDRecorder::key_type labels = {"db", "deployment"}; + return std::make_shared(prefix, "deployment", labels); + } + + private: + std::string prefix_; // for reset + // not copyable and can't clear, so use ptr + // MultiDimension can't define recorder window size by yourself, bvar_dump_interval is the only way + std::shared_ptr md_recorder_ GUARDED_BY(mutex_); + mutable absl::Mutex mutex_; // protects collectors_ +}; +} // namespace openmldb::statistics +#endif // SRC_STATISTICS_QUERY_RESPONSE_TIME_DEPLOYMENT_METRIC_COLLECTOR_H_ diff --git a/src/statistics/query_response_time/deployment_metric_collector_test.cc b/src/statistics/query_response_time/deployment_metric_collector_test.cc new file mode 100644 index 00000000000..06ae0e0ddd4 --- /dev/null +++ b/src/statistics/query_response_time/deployment_metric_collector_test.cc @@ -0,0 +1,134 @@ +/* + * Copyright 2022 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "statistics/query_response_time/deployment_metric_collector.h" + +#include +#include +#include +#include +#include + +#include "absl/random/random.h" +#include "absl/strings/str_cat.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +namespace bvar { +DECLARE_int32(bvar_dump_interval); +} + +namespace openmldb { +namespace statistics { + +class CollectorTest : public ::testing::Test { + public: + CollectorTest() = default; + ~CollectorTest() override = default; +}; + +using std::ifstream; +using std::string; +using std::ios_base; + +void mem_usage() { + double vm_usage = 0.0; + double resident_set = 0.0; + ifstream stat_stream("/proc/self/stat", ios_base::in); // get info from proc directory + // create some variables to get info + string pid, comm, state, ppid, pgrp, session, tty_nr; + string tpgid, flags, minflt, cminflt, majflt, cmajflt; + string utime, stime, cutime, cstime, priority, nice; + string O, itrealvalue, starttime; + uint64_t vsize; + int64_t rss; + stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt >> + majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> O >> itrealvalue >> starttime >> + vsize >> rss; // don't care about the rest + stat_stream.close(); + int64_t page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // for x86-64 is configured to use 2MB pages + vm_usage = vsize / 1024.0; + resident_set = rss * page_size_kb; + LOG(INFO) << "VM: " << vm_usage << "KB; RSS: " << resident_set << "KB"; +} + +TEST_F(CollectorTest, MemoryTest) { + // let's see how much memory will be used + auto test = [](int db_size, int dpl_size, int request_cnt, bool will_fail = false) { + DeploymentMetricCollector collector("test"); + for (int db_idx = 0; db_idx < db_size; db_idx++) { + auto db = "db" + std::to_string(db_idx); + for (int i = 0; i < dpl_size; i++) { + for (int t = 0; t < request_cnt; t++) { + auto st = collector.Collect(db, "d" + std::to_string(i), absl::Microseconds(10)); + } + } + } + if (will_fail) { + return; + } + auto desc = collector.Desc({}); + LOG(INFO) << desc; + ASSERT_TRUE(desc == + absl::StrCat(R"({"name" : "test_deployment", "labels" : ["db", "deployment"], "stats_count" : )", + db_size * dpl_size, "}")) + << desc; + // peek one + auto dstat = collector.Desc({"db0", "d0"}); + LOG(INFO) << dstat; + // can't check qps, and latency is calc from window, it'll == 0 if get too fase, so we just check count + ASSERT_TRUE(dstat.find("count:" + std::to_string(request_cnt)) != std::string::npos) << dstat; + mem_usage(); + }; + // empty mem VM: 104948KB; RSS: 5752KB + mem_usage(); + // recorder mem for one label is stable, VM: ~105576KB; RSS: ~6512KB, even collect 10M requests + // but VM&RSS containes other memory, should use diff to get recorder mem + test(1, 1, 1000); // + ~0.5M + test(1, 1, 10000); + test(1, 1, 100000); + // disable for speed + // test(1, 1, 1000000); + // test(1, 1, 10000000); + + // VM: 105568KB; RSS: 6628KB + // VM: 105964KB; RSS: 6984KB + // VM: 110056KB; RSS: 11208KB + // VM: 341356KB; RSS: 126256KB + // VM: 344076KB; RSS: 129936KB + // test(1, 10, 1000); + // test(1, 100, 1000); + // test(1, 1000, 1000); + // test(1, 10000, 1000); + // test(10, 1000, 1000); + + // MAX_MULTI_DIMENSION_STATS_COUNT = 20000, so total alive deployment count <= 20001 + // If we need more, need a repetitive task to reset (K*bvar_bump_interval?) or add LabelLatencyRecorder + test(1, 20002, 1, true); +} + +} // namespace statistics +} // namespace openmldb + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + ::google::ParseCommandLineFlags(&argc, &argv, true); + bvar::FLAGS_bvar_dump_interval = 75; + return RUN_ALL_TESTS(); +} diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index ee0150d4548..4b30036465c 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -189,7 +189,10 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path, mode_recycle_root_paths_[::openmldb::common::kSSD]); ::openmldb::base::SplitString(FLAGS_recycle_bin_hdd_root_path, ",", mode_recycle_root_paths_[::openmldb::common::kHDD]); - deploy_collector_ = std::make_unique<::openmldb::statistics::DeployQueryTimeCollector>(); + // if want /brpc_metrics, prefix should be g_server_info_prefix+(when no server_info_name), means + // rpc_server_ if standalone, diy + deploy_collector_ = std::make_unique<::openmldb::statistics::DeploymentMetricCollector>( + "rpc_server_" + endpoint.substr(endpoint.find(":") + 1)); if (!zk_cluster.empty()) { zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path, @@ -1574,15 +1577,15 @@ void TabletImpl::Query(RpcController* ctrl, const openmldb::api::QueryRequest* r brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - ProcessQuery(ctrl, request, response, &buf); + ProcessQuery(true, ctrl, request, response, &buf); } -void TabletImpl::ProcessQuery(RpcController* ctrl, const openmldb::api::QueryRequest* request, +void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb::api::QueryRequest* request, ::openmldb::api::QueryResponse* response, butil::IOBuf* buf) { auto start = absl::Now(); - absl::Cleanup deploy_collect_task = [this, request, start]() { + absl::Cleanup deploy_collect_task = [this, is_sub, request, start]() { if (this->IsCollectDeployStatsEnabled()) { - if (request->is_procedure() && request->has_db() && request->has_sp_name()) { + if (!is_sub && request->is_procedure() && request->has_db() && request->has_sp_name()) { this->TryCollectDeployStats(request->db(), request->sp_name(), start); } } @@ -1704,7 +1707,8 @@ void TabletImpl::SubQuery(RpcController* ctrl, const openmldb::api::QueryRequest brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - ProcessQuery(ctrl, request, response, &buf); + // subquery don't need to collect deploy stats + ProcessQuery(true, ctrl, request, response, &buf); } void TabletImpl::SQLBatchRequestQuery(RpcController* ctrl, const openmldb::api::SQLBatchRequestQueryRequest* request, @@ -1713,15 +1717,15 @@ void TabletImpl::SQLBatchRequestQuery(RpcController* ctrl, const openmldb::api:: brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - return ProcessBatchRequestQuery(ctrl, request, response, buf); + return ProcessBatchRequestQuery(false, ctrl, request, response, buf); } -void TabletImpl::ProcessBatchRequestQuery(RpcController* ctrl, +void TabletImpl::ProcessBatchRequestQuery(bool is_sub, RpcController* ctrl, const openmldb::api::SQLBatchRequestQueryRequest* request, openmldb::api::SQLBatchRequestQueryResponse* response, butil::IOBuf& buf) { absl::Time start = absl::Now(); - absl::Cleanup deploy_collect_task = [this, request, start]() { + absl::Cleanup deploy_collect_task = [this, is_sub, request, start]() { if (this->IsCollectDeployStatsEnabled()) { - if (request->is_procedure() && request->has_db() && request->has_sp_name()) { + if (!is_sub && request->is_procedure() && request->has_db() && request->has_sp_name()) { this->TryCollectDeployStats(request->db(), request->sp_name(), start); } } @@ -1893,7 +1897,7 @@ void TabletImpl::SubBatchRequestQuery(RpcController* ctrl, const openmldb::api:: brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(ctrl); butil::IOBuf& buf = cntl->response_attachment(); - return ProcessBatchRequestQuery(ctrl, request, response, buf); + return ProcessBatchRequestQuery(true, ctrl, request, response, buf); } void TabletImpl::ChangeRole(RpcController* controller, const ::openmldb::api::ChangeRoleRequest* request, @@ -4319,6 +4323,12 @@ void TabletImpl::UpdateGlobalVarTable() { it->Next(); } std::atomic_store_explicit(&global_variables_, new_global_var, std::memory_order_relaxed); + + // no DEPLOY_STATS when init, so we can get the first DEPLOY_STATS value here, and all changes will be handled in + // and we assume that global vars change is low frequency, so we reset here instead of in the repeated task + if (!IsCollectDeployStatsEnabled()) { + deploy_collector_->Reset(); + } return; } @@ -5344,7 +5354,7 @@ void TabletImpl::DropProcedure(RpcController* controller, const ::openmldb::api: if (is_deployment_procedure) { auto collector_key = absl::StrCat(db_name, ".", sp_name); - auto s = deploy_collector_->DeleteDeploy(collector_key); + auto s = deploy_collector_->DeleteDeploy(db_name, sp_name); if (!s.ok()) { LOG(ERROR) << "[ERROR] delete deploy collector: " << s; } else { @@ -5497,31 +5507,12 @@ bool TabletImpl::IsCollectDeployStatsEnabled() const { } // try collect the cost time for a deployment procedure into collector -// if the procedure found in collector, it is colelcted directly -// if not, the function will try find the procedure info from procedure cache, and if turns out is a deployment -// procedure, retry collecting by firstly adding the missing deployment procedure into collector +// if failed, log inside, no extra handle void TabletImpl::TryCollectDeployStats(const std::string& db, const std::string& name, absl::Time start_time) { absl::Time now = absl::Now(); absl::Duration time = now - start_time; - const std::string deploy_name = absl::StrCat(db, ".", name); - auto s = deploy_collector_->Collect(deploy_name, time); - if (absl::IsNotFound(s)) { - // deploy collector is regarded as non-update-to-date cache for sp_info (sp_cache_ should be up-to-date) - // so when Not Found error happens, retry once again by AddDeploy first, with the help of sp_cache_ - auto sp_info = sp_cache_->FindSpProcedureInfo(db, name); - if (sp_info.ok() && sp_info.value()->GetType() == hybridse::sdk::kReqDeployment) { - s = deploy_collector_->AddDeploy(deploy_name); - if (!s.ok()) { - LOG(ERROR) << "[ERROR] add deploy collector: " << s; - return; - } - s = deploy_collector_->Collect(deploy_name, time); - } - } - if (!s.ok()) { - LOG(ERROR) << "[ERROR] collect deploy stat: " << s; - } - DLOG(INFO) << "collected " << deploy_name << " for " << time; + auto st = deploy_collector_->Collect(db, name, time); + DLOG(INFO) << "collect " << db << "." << name << " latency " << time; } void TabletImpl::BulkLoad(RpcController* controller, const ::openmldb::api::BulkLoadRequest* request, @@ -5783,14 +5774,7 @@ void TabletImpl::GetAndFlushDeployStats(::google::protobuf::RpcController* contr ::google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - auto rs = deploy_collector_->Flush(); - for (auto& r : rs) { - auto new_row = response->add_rows(); - new_row->set_deploy_name(r.deploy_name_); - new_row->set_time(r.GetTimeAsStr(statistics::TimeUnit::MICRO_SECOND)); - new_row->set_count(r.count_); - new_row->set_total(r.GetTotalAsStr(statistics::TimeUnit::MICRO_SECOND)); - } + // TODO(hw): delete rpc? response->set_code(ReturnCode::kOk); } diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index 83135ad72e6..c24a253c9e9 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -33,9 +33,9 @@ #include "nameserver/system_table.h" #include "proto/tablet.pb.h" #include "replica/log_replicator.h" -#include "storage/aggregator.h" #include "sdk/sql_cluster_router.h" -#include "statistics/query_response_time/deploy_query_response_time.h" +#include "statistics/query_response_time/deployment_metric_collector.h" +#include "storage/aggregator.h" #include "storage/mem_table.h" #include "storage/mem_table_snapshot.h" #include "tablet/bulk_load_mgr.h" @@ -219,10 +219,10 @@ class TabletImpl : public ::openmldb::api::TabletServer { openmldb::api::QueryResponse* response, Closure* done); void CreateFunction(RpcController* controller, const openmldb::api::CreateFunctionRequest* request, - openmldb::api::CreateFunctionResponse* response, Closure* done); + openmldb::api::CreateFunctionResponse* response, Closure* done); void DropFunction(RpcController* controller, const openmldb::api::DropFunctionRequest* request, - openmldb::api::DropFunctionResponse* response, Closure* done); + openmldb::api::DropFunctionResponse* response, Closure* done); void SubQuery(RpcController* controller, const openmldb::api::QueryRequest* request, openmldb::api::QueryResponse* response, Closure* done); @@ -279,9 +279,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { public: explicit UpdateAggrClosure(const std::function& callback) : callback_(callback) {} - void Run() override { - callback_(); - } + void Run() override { callback_(); } private: std::function callback_; @@ -318,25 +316,22 @@ class TabletImpl : public ::openmldb::api::TabletServer { void DumpIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, - uint32_t partition_num, - const std::vector<::openmldb::common::ColumnKey>& column_keys, + uint32_t partition_num, const std::vector<::openmldb::common::ColumnKey>& column_keys, uint64_t offset, std::shared_ptr<::openmldb::api::TaskInfo> task); void SendIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, const std::map& pid_endpoint_map, std::shared_ptr<::openmldb::api::TaskInfo> task); - void LoadIndexDataInternal(uint32_t tid, uint32_t pid, uint32_t cur_pid, - uint32_t partition_num, uint64_t last_time, - std::shared_ptr<::openmldb::api::TaskInfo> task); + void LoadIndexDataInternal(uint32_t tid, uint32_t pid, uint32_t cur_pid, uint32_t partition_num, uint64_t last_time, + std::shared_ptr<::openmldb::api::TaskInfo> task); base::Status TruncateTableInternal(uint32_t tid, uint32_t pid); void ExtractIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, - std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, - const std::vector<::openmldb::common::ColumnKey>& column_key, - uint32_t partition_num, uint64_t offset, bool contain_dump, - std::shared_ptr<::openmldb::api::TaskInfo> task); + std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, + const std::vector<::openmldb::common::ColumnKey>& column_key, uint32_t partition_num, + uint64_t offset, bool contain_dump, std::shared_ptr<::openmldb::api::TaskInfo> task); void SchedMakeSnapshot(); @@ -360,7 +355,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { int LoadTableInternal(uint32_t tid, uint32_t pid, std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); int LoadDiskTableInternal(uint32_t tid, uint32_t pid, const ::openmldb::api::TableMeta& table_meta, - std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); + std::shared_ptr<::openmldb::api::TaskInfo> task_ptr); int WriteTableMeta(const std::string& path, const ::openmldb::api::TableMeta* table_meta); int UpdateTableMeta(const std::string& path, ::openmldb::api::TableMeta* table_meta, bool for_add_column); @@ -373,8 +368,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { void SetTaskStatus(std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, // NOLINT ::openmldb::api::TaskStatus status); - int GetTaskStatus(const std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, - ::openmldb::api::TaskStatus* status); + int GetTaskStatus(const std::shared_ptr<::openmldb::api::TaskInfo>& task_ptr, ::openmldb::api::TaskStatus* status); bool IsExistTaskUnLock(const ::openmldb::api::TaskInfo& task); @@ -400,8 +394,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { bool GetTableRootSize(uint32_t tid, uint32_t pid, const ::openmldb::common::StorageMode& mode, uint64_t& size); // NOLINT - int32_t GetSnapshotOffset(uint32_t tid, uint32_t pid, - ::openmldb::common::StorageMode storageMode, + int32_t GetSnapshotOffset(uint32_t tid, uint32_t pid, ::openmldb::common::StorageMode storageMode, std::string& msg, // NOLINT uint64_t& term, uint64_t& offset); // NOLINT @@ -411,9 +404,10 @@ class TabletImpl : public ::openmldb::api::TabletServer { bool GetRealEp(uint64_t tid, uint64_t pid, std::map* real_ep_map); - void ProcessQuery(RpcController* controller, const openmldb::api::QueryRequest* request, + void ProcessQuery(bool is_sub, RpcController* controller, const openmldb::api::QueryRequest* request, ::openmldb::api::QueryResponse* response, butil::IOBuf* buf); - void ProcessBatchRequestQuery(RpcController* controller, const openmldb::api::SQLBatchRequestQueryRequest* request, + void ProcessBatchRequestQuery(bool is_sub, RpcController* controller, + const openmldb::api::SQLBatchRequestQueryRequest* request, openmldb::api::SQLBatchRequestQueryResponse* response, butil::IOBuf& buf); // NOLINT @@ -421,11 +415,9 @@ class TabletImpl : public ::openmldb::api::TabletServer { const ::openmldb::storage::Dimensions& dimensions, uint64_t log_offset); bool CreateAggregatorInternal(const ::openmldb::api::CreateAggregatorRequest* request, - std::string& msg); //NOLINT + std::string& msg); // NOLINT - inline bool IsClusterMode() const { - return startup_mode_ == ::openmldb::type::StartupMode::kCluster; - } + inline bool IsClusterMode() const { return startup_mode_ == ::openmldb::type::StartupMode::kCluster; } std::string GetDBPath(const std::string& root_path, uint32_t tid, uint32_t pid); @@ -460,10 +452,8 @@ class TabletImpl : public ::openmldb::api::TabletServer { std::set sync_snapshot_set_; std::map> file_receiver_map_; BulkLoadMgr bulk_load_mgr_; - std::map<::openmldb::common::StorageMode, std::vector> - mode_root_paths_; - std::map<::openmldb::common::StorageMode, std::vector> - mode_recycle_root_paths_; + std::map<::openmldb::common::StorageMode, std::vector> mode_root_paths_; + std::map<::openmldb::common::StorageMode, std::vector> mode_recycle_root_paths_; std::atomic follower_; std::shared_ptr> real_ep_map_; // thread safe @@ -482,7 +472,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { std::shared_ptr> global_variables_; - std::unique_ptr deploy_collector_; + std::unique_ptr deploy_collector_; std::atomic memory_used_ = 0; };