Skip to content

Commit

Permalink
Merge remote-tracking branch 'veg/p99-metric' into p99-metric-pr
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken committed Nov 10, 2023
2 parents 9e03d53 + 8a44427 commit 8c78b52
Show file tree
Hide file tree
Showing 20 changed files with 407 additions and 438 deletions.
32 changes: 21 additions & 11 deletions docs/zh/maintain/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ OpenMLDB exporter 是以 Python 实现的 Prometheus exporter,核心是通过

2. 启动 OpenMLDB

参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, 或者确认启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`
参见 [install_deploy](../deploy/install_deploy.md) 如何搭建 OpenMLDB。组件启动时需要保证有 flag `--enable_status_service=true`, OpenMLDB启动脚本(无论是sbin或bin)都已配置为true,如果你使用个人方式启动,需要保证启动 flag 文件 (`conf/(tablet|nameserver).flags`) 中有 `--enable_status_service=true`

默认启动脚本 `bin/start.sh` 开启了 server status, 不需要额外配置。

3. 注意:合理选择 OpenMLDB 各组件和 OpenMLDB exporter, 以及 Prometheus, Grafana 的绑定 IP 地址,确保 Grafana 可以访问到 Prometheus, 并且 Prometheus,OpenMLDB exporter 和 OpenMLDB 各个组件之间可以相互访问。

### 部署 OpenMLDB exporter
Expand Down Expand Up @@ -168,13 +166,6 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope
- component status: 集群组件状态
- table status: 数据库表相关信息,如 `rows_count`, `memory_bytes`
- deploy query response time: deployment query 在 tablet 内部的运行时间

**除了 deploy query response time 指标外, 成功配置监控之后都可以直接查询到指标. Deploy query response time 需要全局变量 `deploy_stats` 开启后才会有数据, 在 OpenMLDB CLI 中输入 SQL:**

```sql
SET GLOBAL deploy_stats = 'on';
```

你可以通过

Expand All @@ -184,9 +175,27 @@ OpenMLDB 提供了 Prometheus 和 Grafana 配置文件以作参考,详见 [Ope

查看完整 DB-Level 指标和帮助信息。

通过Component-Level 指标通过Grafana聚合的DB-Level 指标(未单独声明时,time单位为us):

- deploy query response time: deployment query 在OpenMLDB内部的运行时间,按DB.DEPLOYMENT汇总
**需要全局变量 `deploy_stats` 开启后才会开始统计, 在 OpenMLDB CLI 中输入 SQL:**

```sql
SET GLOBAL deploy_stats = 'on';
```
然后,还需要执行deplpoyment,才会出现相应的指标。
如果SET变量为off,会清空server中的所有deployment指标并停止统计(已被Prometheus抓取的数据不影响)。
- count:count类统计值从deploy_stats on时开始统计,不区分请求的成功和失败。
- latency, qps:这类指标只统计`[current_time - interval, current_time]`时间窗口内的数据,interval由Tablet Server配置项`bvar_dump_interval`配置,默认为75秒。

- api server http time: 各API接口的处理耗时(不包含route),只监测接口耗时,不做细粒度区分,目前也不通过Grafana展示,可以通过Prometheus手动查询。目前监测`deployment`、`sp`和`query`三种方法。
- api server route time: APIServer进行http route的耗时,通常为us级别,一般忽略不计

以上聚合指标的获取方式见下文。在组件指标中,deploy query response time关键字为`deployment`,api server http time关键字为`http_method`。如果指标展示不正常,可以查询组件指标定位问题。

### 2. Component-Level 指标

OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/incubator-brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项:
OpenMLDB 的相关组件(即 nameserver, tablet, etc, 本身作为 BRPC server,暴露了 [Prometheus 相关指标](https://github.com/apache/brpc/blob/master/docs/en/bvar.md#export-to-prometheus), 只需要配置 Prometheus server 从对应地址拉取指标即可。对应 `prometheus_example.yml`中 `job_name=openmldb_components` 项:

```yaml
- job_name: openmldb_components
Expand All @@ -203,6 +212,7 @@ OpenMLDB 的相关组件(即 nameserver, tablet, etc), 本身作为 BRPC serve

- BRPC server 进程相关信息
- 对应 BRPC server 定义的 RPC method 相关指标,例如该 RPC 的请求 `count`, `error_count`, `qps` 和 `response_time`
- Deployment 相关指标,分deployment统计,但只统计该tablet上的deployment请求。区别于Grafana做的集群级别整合。

通过

Expand Down
1 change: 1 addition & 0 deletions release/conf/apiserver.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
--log_level=info

#--thread_pool_size=16
--bvar_max_dump_multi_dimension_metric_number=10
2 changes: 2 additions & 0 deletions release/conf/tablet.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,5 @@

# turn this option on to export openmldb metric status
# --enable_status_service=false
--bvar_max_dump_multi_dimension_metric_number=10
--bvar_dump_interval=75
67 changes: 42 additions & 25 deletions src/apiserver/api_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
#include <string>

#include "apiserver/interface_provider.h"

#include "absl/cleanup/cleanup.h"
#include "brpc/server.h"
#include "butil/time.h"

namespace openmldb {
namespace apiserver {

APIServerImpl::APIServerImpl(const std::string& endpoint)
: md_recorder_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1), "http_method", {"method"}),
provider_("rpc_server_" + endpoint.substr(endpoint.find(":") + 1)) {}

APIServerImpl::~APIServerImpl() = default;

bool APIServerImpl::Init(const sdk::ClusterOptions& options) {
Expand Down Expand Up @@ -72,7 +79,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
auto* cntl = dynamic_cast<brpc::Controller*>(cntl_base);

// The unresolved path has no slashes at the beginning(guaranteed by brpc), it's not good for url parsing
auto unresolved_path = "/" + cntl->http_request().unresolved_path();
auto method = cntl->http_request().method();
Expand All @@ -81,7 +87,6 @@ void APIServerImpl::Process(google::protobuf::RpcController* cntl_base, const Ht

JsonWriter writer;
provider_.handle(unresolved_path, method, req_body, writer);

cntl->response_attachment().append(writer.GetString());
}

Expand Down Expand Up @@ -110,6 +115,12 @@ std::map<std::string, ExecContext> mode_map{
void APIServerImpl::RegisterQuery() {
provider_.post("/dbs/:db_name", [this](const InterfaceProvider::Params& param, const butil::IOBuf& req_body,
JsonWriter& writer) {
auto start = absl::Now();
absl::Cleanup method_latency = [this, start]() {
// TODO(hw): query should split into async/sync, online/offline?
absl::Duration time = absl::Now() - start;
*md_recorder_.get_stats({"query"}) << absl::ToInt64Microseconds(time);
};
auto resp = GeneralResp();
auto db_it = param.find("db_name");
if (db_it == param.end()) {
Expand Down Expand Up @@ -158,9 +169,9 @@ void APIServerImpl::RegisterQuery() {
});
}

bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
absl::Status APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
auto sch = row->GetSchema();

// scan all strings to init the total string length
Expand All @@ -186,18 +197,20 @@ bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_c
for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) {
if (sch->IsConstant(i)) {
if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++common_idx;
} else {
if (!AppendJsonValue(non_common_cols_v[non_common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i),
row)) {
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++non_common_idx;
}
}
return true;
return absl::OkStatus();
}

template <typename T>
Expand Down Expand Up @@ -281,9 +294,9 @@ bool APIServerImpl::AppendJsonValue(const butil::rapidjson::Value& v, hybridse::
}

// common_cols_v is still an array, but non_common_cols_v is map, should find the value by the column name
bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
absl::Status APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
auto sch = row->GetSchema();

// scan all strings to init the total string length
Expand All @@ -300,8 +313,7 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com
if (sch->GetColumnType(i) == hybridse::sdk::kTypeString) {
auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str());
if (v == non_common_cols_v.MemberEnd()) {
LOG(WARNING) << "can't find " << sch->GetColumnName(i);
return false;
return absl::InvalidArgumentError("can't find col " + sch->GetColumnName(i));
}
str_len_sum += v->value.GetStringLength();
}
Expand All @@ -313,23 +325,22 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com
for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) {
if (sch->IsConstant(i)) {
if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed";
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++common_idx;
} else {
auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str());
if (v == non_common_cols_v.MemberEnd()) {
LOG(WARNING) << "can't find " << sch->GetColumnName(i);
return false;
return absl::InvalidArgumentError("can't find " + sch->GetColumnName(i));
}
if (!AppendJsonValue(v->value, sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed";
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
}
}
return true;
return absl::OkStatus();
}

void APIServerImpl::RegisterPut() {
Expand Down Expand Up @@ -420,6 +431,11 @@ void APIServerImpl::RegisterExecSP() {

void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param,
const butil::IOBuf& req_body, JsonWriter& writer) {
auto start = absl::Now();
absl::Cleanup method_latency = [this, start, has_common_col]() {
absl::Duration time = absl::Now() - start;
*md_recorder_.get_stats({has_common_col ? "sp" : "deployment"}) << absl::ToInt64Microseconds(time);
};
auto resp = GeneralResp();
auto db_it = param.find("db_name");
auto sp_it = param.find("sp_name");
Expand Down Expand Up @@ -458,7 +474,6 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
return;
}
const auto& rows = input->value;

hybridse::sdk::Status status;
// We need to use ShowProcedure to get input schema(should know which column is constant).
// GetRequestRowByProcedure can't do that.
Expand Down Expand Up @@ -498,13 +513,15 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
writer << resp.Set("Invalid input data size in row " + std::to_string(i));
return;
}
if (!JsonArray2SQLRequestRow(rows[i], common_cols_v, row)) {
writer << resp.Set("Translate to request row failed in array row " + std::to_string(i));
if (auto st = JsonArray2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) {
writer << resp.Set("Translate to request row failed in array row " + std::to_string(i) + ", " +
st.ToString());
return;
}
} else if (rows[i].IsObject()) {
if (!JsonMap2SQLRequestRow(rows[i], common_cols_v, row)) {
writer << resp.Set("Translate to request row failed in map row " + std::to_string(i));
if (auto st = JsonMap2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) {
writer << resp.Set("Translate to request row failed in map row " + std::to_string(i) + ", " +
st.ToString());
return;
}
} else {
Expand Down
26 changes: 18 additions & 8 deletions src/apiserver/api_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include "sdk/sql_cluster_router.h"
#include "sdk/sql_request_row.h"

#include "absl/status/status.h"
#include "bvar/bvar.h"
#include "bvar/multi_dimension.h" // latency recorder

namespace openmldb {
namespace apiserver {

Expand All @@ -45,7 +49,7 @@ using butil::rapidjson::Writer;
// Both input and output are json data. We use rapidjson to handle it.
class APIServerImpl : public APIServer {
public:
APIServerImpl() = default;
explicit APIServerImpl(const std::string& endpoint);
~APIServerImpl() override;
bool Init(const sdk::ClusterOptions& options);
bool Init(::openmldb::sdk::DBSDK* cluster);
Expand All @@ -69,10 +73,10 @@ class APIServerImpl : public APIServer {
void ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param, const butil::IOBuf& req_body,
JsonWriter& writer); // NOLINT

static bool JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row);
static bool JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
static absl::Status JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row);
static absl::Status JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row);
template <typename T>
Expand All @@ -82,13 +86,19 @@ class APIServerImpl : public APIServer {
// may get segmentation fault when throw boost::bad_lexical_cast, so we use std::from_chars
template <typename T>
static bool FromString(const std::string& s, T& value) { // NOLINT
auto res = std::from_chars(s.data(), s.data() + s.size(), value);
return res.ec == std::errc() && (res.ptr - s.data() == s.size());
if (auto res = std::from_chars(s.data(), s.data() + s.size(), value); res.ec == std::errc()) {
auto len = res.ptr - s.data();
return len >= 0 ? (uint64_t)len == s.size() : false;
} else {
return false;
}
}

private:
std::shared_ptr<sdk::SQLRouter> sql_router_;
bvar::MultiDimension<bvar::LatencyRecorder> md_recorder_;
InterfaceProvider provider_;

std::shared_ptr<sdk::SQLRouter> sql_router_;
// cluster_sdk_ is not owned by this class.
::openmldb::sdk::DBSDK* cluster_sdk_ = nullptr;
};
Expand Down
3 changes: 2 additions & 1 deletion src/apiserver/api_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,8 @@ TEST_F(APIServerTest, jsonInput) {
ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText();
LOG(INFO) << "get sp resp: " << show_cntl.response_attachment();

// call deployment in json style input(won't check if it's a sp or deployment)
// call sp in deployment api with json style input(won't check if it's a sp or deployment), so it'll have field
// `common_cols_data`
{
brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
Expand Down
7 changes: 7 additions & 0 deletions src/apiserver/interface_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <deque>

#include "boost/algorithm/string/split.hpp"
#include "butil/time.h"
#include "bvar/bvar.h"
#include "glog/logging.h"

namespace openmldb {
Expand Down Expand Up @@ -159,6 +161,8 @@ void InterfaceProvider::registerRequest(brpc::HttpMethod type, std::string const

bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& method, const butil::IOBuf& req_body,
JsonWriter& writer) {
butil::Timer tm;
tm.start();
auto err = GeneralResp();
Url url;

Expand Down Expand Up @@ -190,6 +194,9 @@ bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod&
}

auto params = extractParameters(url, request->url);
tm.stop();
route_recorder_ << tm.u_elapsed();

request->callback(params, req_body, writer);
return true;
}
Expand Down
6 changes: 5 additions & 1 deletion src/apiserver/interface_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "apiserver/json_helper.h"
#include "brpc/http_method.h" // HttpMethod
#include "butil/iobuf.h" // IOBuf
#include "bvar/bvar.h" // latency recorder
#include "proto/api_server.pb.h"

namespace openmldb {
Expand Down Expand Up @@ -111,7 +112,7 @@ class ReducedUrlParser {

class InterfaceProvider {
public:
InterfaceProvider() = default;
explicit InterfaceProvider(const std::string& metric_prefix) : route_recorder_(metric_prefix, "http_route") {}
InterfaceProvider& operator=(InterfaceProvider const&) = delete;
InterfaceProvider(InterfaceProvider const&) = delete;

Expand Down Expand Up @@ -160,6 +161,9 @@ class InterfaceProvider {
void registerRequest(brpc::HttpMethod, const std::string& path, std::function<func>&& callback);

private:
// we only record route latency, method latency is recorded in callback(you may need record in parts), defined in
// api server impl
bvar::LatencyRecorder route_recorder_;
std::unordered_map<int, std::vector<BuiltRequest>> requests_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ bool TabletClient::SubBatchRequestQuery(const ::openmldb::api::SQLBatchRequestQu
if (callback == nullptr) {
return false;
}
return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SQLBatchRequestQuery,
return client_.SendRequest(&::openmldb::api::TabletServer_Stub::SubBatchRequestQuery,
callback->GetController().get(), &request, callback->GetResponse().get(), callback);
}

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3878,7 +3878,7 @@ void StartAPIServer() {
GetRealEndpoint(&real_endpoint);
}

auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>();
auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>(real_endpoint);
if (!FLAGS_nameserver.empty()) {
std::vector<std::string> vec;
boost::split(vec, FLAGS_nameserver, boost::is_any_of(":"));
Expand Down
Loading

0 comments on commit 8c78b52

Please sign in to comment.