Skip to content

Commit

Permalink
feat: change metric crate from metrics to prometheus (#2655)
Browse files Browse the repository at this point in the history
* feat: change metrics to prometheus

* chore: fix code advise

* chore: resolve merge conflict

* chore: fix code advise
  • Loading branch information
Taylor-lagrange authored Oct 31, 2023
1 parent e3320c5 commit 180bc64
Show file tree
Hide file tree
Showing 119 changed files with 1,130 additions and 1,137 deletions.
276 changes: 46 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [
Expand All @@ -94,6 +93,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi
] }
parquet = "47.0"
paste = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ futures = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
meta-client = { workspace = true }
metrics.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
prometheus.workspace = true
regex.workspace = true
serde.workspace = true
serde_json = "1.0"
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_meta::rpc::store::{
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_telemetry::{debug, timer};
use common_telemetry::debug;
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -152,10 +152,10 @@ impl KvBackend for CachedMetaKvBackend {
}

async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = timer!(METRIC_CATALOG_KV_GET);
let _timer = METRIC_CATALOG_KV_GET.start_timer();

let init = async {
let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET);
let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer();
self.kv_backend.get(key).await.map(|val| {
val.with_context(|| CacheNotGetSnafu {
key: String::from_utf8_lossy(key),
Expand Down
22 changes: 9 additions & 13 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Weak};

use common_catalog::build_db_string;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
use metrics::{decrement_gauge, increment_gauge};
use snafu::OptionExt;
use table::TableRef;

Expand Down Expand Up @@ -166,7 +166,7 @@ impl MemoryCatalogManager {
let arc_self = Arc::new(self.clone());
let catalog = arc_self.create_catalog_entry(name);
e.insert(catalog);
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
Ok(true)
}
Entry::Occupied(_) => Ok(false),
Expand All @@ -187,11 +187,9 @@ impl MemoryCatalogManager {
})?;
let result = schema.remove(&request.table_name);
if result.is_some() {
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
.with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
.dec();
}
Ok(())
}
Expand All @@ -210,7 +208,7 @@ impl MemoryCatalogManager {
match catalog.entry(request.schema) {
Entry::Vacant(e) => {
e.insert(HashMap::new());
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
Ok(true)
}
Entry::Occupied(_) => Ok(false),
Expand Down Expand Up @@ -238,11 +236,9 @@ impl MemoryCatalogManager {
.fail();
}
schema.insert(request.table_name, request.table);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
.with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
.inc();
Ok(true)
}

Expand Down
28 changes: 17 additions & 11 deletions src/catalog/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_catalog::build_db_string;

pub(crate) const METRIC_DB_LABEL: &str = "db";

pub(crate) const METRIC_CATALOG_MANAGER_CATALOG_COUNT: &str = "catalog.catalog_count";
pub(crate) const METRIC_CATALOG_MANAGER_SCHEMA_COUNT: &str = "catalog.schema_count";
pub(crate) const METRIC_CATALOG_MANAGER_TABLE_COUNT: &str = "catalog.table_count";

pub(crate) const METRIC_CATALOG_KV_REMOTE_GET: &str = "catalog.kv.get.remote";
pub(crate) const METRIC_CATALOG_KV_GET: &str = "catalog.kv.get";
use lazy_static::lazy_static;
use prometheus::*;

#[inline]
pub(crate) fn db_label(catalog: &str, schema: &str) -> (&'static str, String) {
(METRIC_DB_LABEL, build_db_string(catalog, schema))
lazy_static! {
pub static ref METRIC_CATALOG_MANAGER_CATALOG_COUNT: IntGauge =
register_int_gauge!("catalog_catalog_count", "catalog catalog count").unwrap();
pub static ref METRIC_CATALOG_MANAGER_SCHEMA_COUNT: IntGauge =
register_int_gauge!("catalog_schema_count", "catalog schema count").unwrap();
pub static ref METRIC_CATALOG_MANAGER_TABLE_COUNT: IntGaugeVec = register_int_gauge_vec!(
"catalog_table_count",
"catalog table count",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_CATALOG_KV_REMOTE_GET: Histogram =
register_histogram!("catalog_kv_get_remote", "catalog kv get remote").unwrap();
pub static ref METRIC_CATALOG_KV_GET: Histogram =
register_histogram!("catalog_kv_get", "catalog kv get").unwrap();
}
2 changes: 2 additions & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ datatypes = { workspace = true }
derive_builder.workspace = true
enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
session = { workspace = true }
Expand Down
24 changes: 12 additions & 12 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamAdaptor;
use common_telemetry::{logging, timer};
use common_telemetry::logging;
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -111,12 +111,12 @@ impl Database {
}

pub async fn insert(&self, requests: InsertRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
let _timer = metrics::METRIC_GRPC_INSERT.start_timer();
self.handle(Request::Inserts(requests)).await
}

pub async fn row_insert(&self, requests: RowInsertRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
let _timer = metrics::METRIC_GRPC_INSERT.start_timer();
self.handle(Request::RowInserts(requests)).await
}

Expand All @@ -141,7 +141,7 @@ impl Database {
}

pub async fn delete(&self, request: DeleteRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_DELETE);
let _timer = metrics::METRIC_GRPC_DELETE.start_timer();
self.handle(Request::Deletes(request)).await
}

Expand Down Expand Up @@ -171,7 +171,7 @@ impl Database {
where
S: AsRef<str>,
{
let _timer = timer!(metrics::METRIC_GRPC_SQL);
let _timer = metrics::METRIC_GRPC_SQL.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
Expand All @@ -182,7 +182,7 @@ impl Database {
}

pub async fn logical_plan(&self, logical_plan: Vec<u8>, trace_id: u64) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_LOGICAL_PLAN);
let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
Expand All @@ -199,7 +199,7 @@ impl Database {
end: &str,
step: &str,
) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_PROMQL_RANGE_QUERY);
let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
Expand All @@ -215,7 +215,7 @@ impl Database {
}

pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_CREATE_TABLE);
let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
Expand All @@ -226,7 +226,7 @@ impl Database {
}

pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_ALTER);
let _timer = metrics::METRIC_GRPC_ALTER.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
Expand All @@ -237,7 +237,7 @@ impl Database {
}

pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_DROP_TABLE);
let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
Expand All @@ -248,7 +248,7 @@ impl Database {
}

pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_TRUNCATE_TABLE);
let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
Expand All @@ -260,7 +260,7 @@ impl Database {

async fn do_get(&self, request: Request, trace_id: u64) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics
let _timer = timer!(metrics::METRIC_GRPC_DO_GET);
let _timer = metrics::METRIC_GRPC_DO_GET.start_timer();
let request = self.to_rpc_request(request, trace_id);
let request = Ticket {
ticket: request.encode_to_vec().into(),
Expand Down
43 changes: 31 additions & 12 deletions src/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! client metrics
pub const METRIC_GRPC_CREATE_TABLE: &str = "grpc.create_table";
pub const METRIC_GRPC_PROMQL_RANGE_QUERY: &str = "grpc.promql.range_query";
pub const METRIC_GRPC_INSERT: &str = "grpc.insert";
pub const METRIC_GRPC_DELETE: &str = "grpc.delete";
pub const METRIC_GRPC_SQL: &str = "grpc.sql";
pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan";
pub const METRIC_GRPC_ALTER: &str = "grpc.alter";
pub const METRIC_GRPC_DROP_TABLE: &str = "grpc.drop_table";
pub const METRIC_GRPC_TRUNCATE_TABLE: &str = "grpc.truncate_table";
pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get";
pub(crate) const METRIC_REGION_REQUEST_GRPC: &str = "grpc.region_request";
use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
pub static ref METRIC_GRPC_CREATE_TABLE: Histogram =
register_histogram!("grpc_create_table", "grpc create table").unwrap();
pub static ref METRIC_GRPC_PROMQL_RANGE_QUERY: Histogram =
register_histogram!("grpc_promql_range_query", "grpc promql range query").unwrap();
pub static ref METRIC_GRPC_INSERT: Histogram =
register_histogram!("grpc_insert", "grpc insert").unwrap();
pub static ref METRIC_GRPC_DELETE: Histogram =
register_histogram!("grpc_delete", "grpc delete").unwrap();
pub static ref METRIC_GRPC_SQL: Histogram =
register_histogram!("grpc_sql", "grpc sql").unwrap();
pub static ref METRIC_GRPC_LOGICAL_PLAN: Histogram =
register_histogram!("grpc_logical_plan", "grpc logical plan").unwrap();
pub static ref METRIC_GRPC_ALTER: Histogram =
register_histogram!("grpc_alter", "grpc alter").unwrap();
pub static ref METRIC_GRPC_DROP_TABLE: Histogram =
register_histogram!("grpc_drop_table", "grpc drop table").unwrap();
pub static ref METRIC_GRPC_TRUNCATE_TABLE: Histogram =
register_histogram!("grpc_truncate_table", "grpc truncate table").unwrap();
pub static ref METRIC_GRPC_DO_GET: Histogram =
register_histogram!("grpc_do_get", "grpc do get").unwrap();
pub static ref METRIC_REGION_REQUEST_GRPC: HistogramVec = register_histogram_vec!(
"grpc_region_request",
"grpc region request",
&["request_type"]
)
.unwrap();
}
10 changes: 4 additions & 6 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::{error, timer};
use common_telemetry::error;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -152,11 +152,9 @@ impl RegionRequester {
.with_context(|| MissingFieldSnafu { field: "body" })?
.as_ref()
.to_string();

let _timer = timer!(
metrics::METRIC_REGION_REQUEST_GRPC,
&[("request_type", request_type)]
);
let _timer = metrics::METRIC_REGION_REQUEST_GRPC
.with_label_values(&[request_type.as_str()])
.start_timer();

let mut client = self.client.raw_region_client()?;

Expand Down
4 changes: 1 addition & 3 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ name = "greptime"
path = "src/bin/greptime.rs"

[features]
default = ["metrics-process"]
tokio-console = ["common-telemetry/tokio-console"]
metrics-process = ["servers/metrics-process"]

[dependencies]
anymap = "1.0.0-beta.2"
Expand Down Expand Up @@ -45,11 +43,11 @@ futures.workspace = true
lazy_static.workspace = true
meta-client = { workspace = true }
meta-srv = { workspace = true }
metrics.workspace = true
mito2 = { workspace = true }
nu-ansi-term = "0.46"
partition = { workspace = true }
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true
query = { workspace = true }
rand.workspace = true
Expand Down
11 changes: 8 additions & 3 deletions src/cmd/src/bin/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ use cmd::error::Result;
use cmd::options::{Options, TopLevelOptions};
use cmd::{cli, datanode, frontend, metasrv, standalone};
use common_telemetry::logging::{error, info, TracingOptions};
use metrics::gauge;

lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap();
}

#[derive(Parser)]
#[clap(name = "greptimedb", version = print_version())]
Expand Down Expand Up @@ -204,11 +208,12 @@ async fn main() -> Result<()> {
};

common_telemetry::set_panic_hook();
common_telemetry::init_default_metrics_recorder();
let _guard = common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts);

// Report app version as gauge.
gauge!("app_version", 1.0, "short_version" => short_version(), "version" => full_version());
APP_VERSION
.with_label_values(&[short_version(), full_version()])
.inc();

// Log version and argument flags.
info!(
Expand Down
4 changes: 2 additions & 2 deletions src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

use std::fmt;

use strum::EnumString;
use strum::{AsRefStr, EnumString};

/// Common status code for public API.
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, AsRefStr)]
pub enum StatusCode {
// ====== Begin of common status code ==============
/// Success.
Expand Down
Loading

0 comments on commit 180bc64

Please sign in to comment.