diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bd3f8fc2eec9..c25aca47280d 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -129,11 +129,10 @@ parallel_scan_channel_size = 32 # [export_metrics] # whether enable export metrics, default is false # enable = false -# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = "127.0.0.1:4000" -# The database name of exported metrics stores, user needs to specify a valid database -# db = "" # The interval of export metrics # write_interval = "30s" +# [export_metrics.remote_write] +# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema` +# url = "" # HTTP headers of Prometheus remote-write carry # headers = {} diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 6bb84e66d3ab..c8fe80cb14f7 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -87,11 +87,8 @@ tcp_nodelay = true # [export_metrics] # whether enable export metrics, default is false # enable = false -# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = "127.0.0.1:4000" -# The database name of exported metrics stores, user needs to specify a valid database -# db = "" # The interval of export metrics # write_interval = "30s" -# HTTP headers of Prometheus remote-write carry -# headers = {} +# for `frontend`, `self_import` is recommend to collect metrics generated by itself +# [export_metrics.self_import] +# db = "information_schema" diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 8a55bee7913f..2664a088f334 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -86,11 +86,10 @@ provider = "raft_engine" # [export_metrics] # whether enable export metrics, default is false # enable = false -# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = "127.0.0.1:4000" -# The database name of exported metrics stores, user needs to specify a valid database -# db = "" # The interval of export metrics # write_interval = "30s" +# [export_metrics.remote_write] +# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema` +# url = "" # HTTP headers of Prometheus remote-write carry # headers = {} diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0fa58dd413c6..e82e8954ab5e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -230,11 +230,8 @@ parallel_scan_channel_size = 32 # [export_metrics] # whether enable export metrics, default is false # enable = false -# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = "127.0.0.1:4000" -# The database name of exported metrics stores, user needs to specify a valid database -# db = "" # The interval of export metrics # write_interval = "30s" -# HTTP headers of Prometheus remote-write carry -# headers = {} +# for `standalone`, `self_import` is recommend to collect metrics generated by itself +# [export_metrics.self_import] +# db = "information_schema" diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 607c7f3d7bd1..1f43c7ab0810 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -252,10 +252,6 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - instance - .build_export_metrics_task(&opts.export_metrics) - .context(StartFrontendSnafu)?; - instance .build_servers(opts) .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 27cc19fd6655..c5bcd3659f98 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -425,10 +425,6 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - frontend - .build_export_metrics_task(&opts.frontend.export_metrics) - .context(StartFrontendSnafu)?; - frontend .build_servers(opts) .await diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0a0206eddc66..60900a6cdf72 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -98,7 +98,7 @@ impl Datanode { self.start_telemetry(); if let Some(t) = self.export_metrics_task.as_ref() { - t.start() + t.start(None).context(StartServerSnafu)? } self.start_services().await diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 59e6a7b4b2df..ff76fa11c3ca 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -55,7 +55,7 @@ use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; -use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; +use servers::export_metrics::ExportMetricsTask; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; @@ -76,6 +76,7 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; +use self::prom_store::ExportMetricHandler; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu, @@ -190,18 +191,16 @@ impl Instance { &mut self, opts: impl Into + TomlSerializable, ) -> Result<()> { + let opts: FrontendOptions = opts.into(); + self.export_metrics_task = + ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins)) + .context(StartServerSnafu)?; let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?; self.servers = Arc::new(servers); Ok(()) } - pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> { - self.export_metrics_task = - ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; - Ok(()) - } - pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } @@ -232,7 +231,15 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; if let Some(t) = self.export_metrics_task.as_ref() { - t.start() + if t.send_by_handler { + let handler = ExportMetricHandler::new_handler( + self.inserter.clone(), + self.statement_executor.clone(), + ); + t.start(Some(handler)).context(StartServerSnafu)? + } else { + t.start(None).context(StartServerSnafu)?; + } } futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move { diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 7b1bd128f0ab..8821ff9a4ffa 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::prom_store::remote::read_request::ResponseType; use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; use async_trait::async_trait; @@ -21,10 +23,14 @@ use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; +use operator::insert::InserterRef; +use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::prom_store::{self, Metrics}; -use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse}; +use servers::query_handler::{ + PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse, +}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -209,3 +215,49 @@ impl PromStoreProtocolHandler for Instance { todo!(); } } + +/// This handler is mainly used for `frontend` or `standalone` to directly import +/// the metrics collected by itself, thereby avoiding importing metrics through the network, +/// thus reducing compression and network transmission overhead, +/// so only implement `PromStoreProtocolHandler::write` method. +pub struct ExportMetricHandler { + inserter: InserterRef, + statement_executor: Arc, +} + +impl ExportMetricHandler { + pub fn new_handler( + inserter: InserterRef, + statement_executor: Arc, + ) -> PromStoreProtocolHandlerRef { + Arc::new(Self { + inserter, + statement_executor, + }) + } +} + +#[async_trait] +impl PromStoreProtocolHandler for ExportMetricHandler { + async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> { + let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?; + self.inserter + .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + Ok(()) + } + + async fn read( + &self, + _request: ReadRequest, + _ctx: QueryContextRef, + ) -> ServerResult { + unreachable!(); + } + + async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> { + unreachable!(); + } +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index eb937a7444be..4118e79e8185 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -94,7 +94,7 @@ impl MetaSrvInstance { self.meta_srv.try_start().await?; if let Some(t) = self.export_metrics_task.as_ref() { - t.start() + t.start(None).context(InitExportMetricsTaskSnafu)? } let (tx, rx) = mpsc::channel::<()>(1); diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 5a08f0a079e4..956d5ddd2f08 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -23,13 +23,14 @@ use common_time::Timestamp; use hyper::HeaderMap; use prost::Message; use reqwest::header::HeaderName; -use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; +use session::context::QueryContextBuilder; use snafu::{ensure, ResultExt}; -use tokio::time; +use tokio::time::{self, Interval}; use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; use crate::prom_store::snappy_compress; +use crate::query_handler::PromStoreProtocolHandlerRef; /// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), /// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) @@ -37,21 +38,40 @@ use crate::prom_store::snappy_compress; #[serde(default)] pub struct ExportMetricsOption { pub enable: bool, - pub endpoint: String, - pub db: String, #[serde(with = "humantime_serde")] pub write_interval: Duration, + pub self_import: Option, + pub remote_write: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(default)] +pub struct RemoteWriteOption { + pub url: String, pub headers: HashMap, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct SelfImportOption { + pub db: String, +} + +impl Default for SelfImportOption { + fn default() -> Self { + Self { + db: "information_schema".to_string(), + } + } +} + impl Default for ExportMetricsOption { fn default() -> Self { Self { enable: false, - endpoint: "127.0.0.1:4000".to_string(), - db: String::new(), write_interval: Duration::from_secs(30), - headers: HashMap::new(), + self_import: None, + remote_write: None, } } } @@ -61,6 +81,7 @@ pub struct ExportMetricsTask { config: ExportMetricsOption, filter: Option, headers: HeaderMap, + pub send_by_handler: bool, } impl ExportMetricsTask { @@ -79,101 +100,244 @@ impl ExportMetricsTask { } ); ensure!( - !config.db.is_empty(), + (config.remote_write.is_none() && config.self_import.is_some()) + || (config.remote_write.is_some() && config.self_import.is_none()), InvalidExportMetricsConfigSnafu { - msg: "Expected export metrics db not empty" + msg: "Only one of `self_import` or `remote_write` can be used as the export method" } ); - // construct http header - let mut headers = reqwest::header::HeaderMap::with_capacity(config.headers.len()); - config.headers.iter().try_for_each(|(k, v)| { - let header = match TryInto::::try_into(k) { - Ok(header) => header, - Err(_) => { - return InvalidExportMetricsConfigSnafu { - msg: format!("Export metrics: invalid HTTP header name: {}", k), - } - .fail() + if let Some(self_import) = &config.self_import { + ensure!( + !self_import.db.is_empty(), + InvalidExportMetricsConfigSnafu { + msg: "Expected `self_import` metrics `db` not empty" } - }; - match TryInto::::try_into(v) { - Ok(value) => headers.insert(header, value), - Err(_) => { - return InvalidExportMetricsConfigSnafu { - msg: format!("Export metrics: invalid HTTP header value: {}", v), - } - .fail() + ); + } + let mut headers = reqwest::header::HeaderMap::new(); + if let Some(remote_write) = &config.remote_write { + ensure!( + !remote_write.url.is_empty(), + InvalidExportMetricsConfigSnafu { + msg: "Expected `remote_write` metrics `url` not empty" } - }; - Ok(()) - })?; + ); + // construct http header + remote_write.headers.iter().try_for_each(|(k, v)| { + let header = match TryInto::::try_into(k) { + Ok(header) => header, + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header name: {}", k), + } + .fail() + } + }; + match TryInto::::try_into(v) { + Ok(value) => headers.insert(header, value), + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header value: {}", v), + } + .fail() + } + }; + Ok(()) + })?; + } Ok(Some(Self { config: config.clone(), filter, headers, + send_by_handler: config.self_import.is_some(), })) } - pub fn start(&self) { + pub fn start(&self, handler: Option) -> Result<()> { if !self.config.enable { - return; + return Ok(()); } - let mut interval = time::interval(self.config.write_interval); - let sec = self.config.write_interval.as_secs(); - let endpoint = format!( - "http://{}/v1/prometheus/write?db={}", - self.config.endpoint, self.config.db - ); + let interval = time::interval(self.config.write_interval); let filter = self.filter.clone(); - let headers = self.headers.clone(); - let _handle = common_runtime::spawn_bg(async move { - info!( - "Start export metrics task to endpoint: {}, interval: {}s", - endpoint, sec + let _handle = if let Some(self_import) = &self.config.self_import { + ensure!( + handler.is_some(), + InvalidExportMetricsConfigSnafu { + msg: "Only `frontend` or `standalone` can use `self_import` as export method." + } ); - // Pass the first tick. Because the first tick completes immediately. - interval.tick().await; - let client = reqwest::Client::new(); - loop { - interval.tick().await; - match write_system_metric(&client, &endpoint, filter.as_ref(), headers.clone()) - .await - { - Ok(resp) => { - if !resp.status().is_success() { - error!("report export metrics error, msg: {:#?}", resp); - } - } - Err(e) => error!("report export metrics failed, error {}", e), - }; + common_runtime::spawn_bg(write_system_metric_by_handler( + self_import.db.clone(), + handler.unwrap(), + filter, + interval, + )) + } else if let Some(remote_write) = &self.config.remote_write { + common_runtime::spawn_bg(write_system_metric_by_network( + self.headers.clone(), + remote_write.url.clone(), + filter, + interval, + )) + } else { + unreachable!() + }; + Ok(()) + } +} + +/// Send metrics collected by standard Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/) +pub async fn write_system_metric_by_network( + headers: HeaderMap, + endpoint: String, + filter: Option, + mut interval: Interval, +) { + info!( + "Start export metrics task to endpoint: {}, interval: {}s", + endpoint, + interval.period().as_secs() + ); + // Pass the first tick. Because the first tick completes immediately. + interval.tick().await; + let client = reqwest::Client::new(); + loop { + interval.tick().await; + let metric_families = prometheus::gather(); + let request = convert_metric_to_write_request( + metric_families, + filter.as_ref(), + Timestamp::current_millis().value(), + ); + let resp = match snappy_compress(&request.encode_to_vec()) { + Ok(body) => client + .post(endpoint.as_str()) + .header("X-Prometheus-Remote-Write-Version", "0.1.0") + .header("Content-Type", "application/x-protobuf") + .headers(headers.clone()) + .body(body) + .send() + .await + .context(SendPromRemoteRequestSnafu), + Err(e) => Err(e), + }; + match resp { + Ok(resp) => { + if !resp.status().is_success() { + error!("report export metrics error, msg: {:#?}", resp); + } } - }); + Err(e) => error!("report export metrics failed, error {}", e), + }; } } -/// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), -/// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. -/// User could use `MetricFilter` to filter metric they don't want collect -pub async fn write_system_metric( - client: &Client, - url: &str, - filter: Option<&MetricFilter>, - headers: HeaderMap, -) -> Result { - let metric_families = prometheus::gather(); - let request = convert_metric_to_write_request( - metric_families, - filter, - Timestamp::current_millis().value(), +/// Send metrics collected by our internal handler +/// for case `frontend` and `standalone` dispose it's own metrics, +/// reducing compression and network transmission overhead. +pub async fn write_system_metric_by_handler( + db: String, + handler: PromStoreProtocolHandlerRef, + filter: Option, + mut interval: Interval, +) { + info!( + "Start export metrics task by handler, interval: {}s", + interval.period().as_secs() ); - // RemoteWrite format require compress by snappy - client - .post(url) - .header("X-Prometheus-Remote-Write-Version", "0.1.0") - .header("Content-Type", "application/x-protobuf") - .headers(headers) - .body(snappy_compress(&request.encode_to_vec())?) - .send() - .await - .context(SendPromRemoteRequestSnafu) + // Pass the first tick. Because the first tick completes immediately. + interval.tick().await; + let ctx = QueryContextBuilder::default().current_schema(db).build(); + loop { + interval.tick().await; + let metric_families = prometheus::gather(); + let request = convert_metric_to_write_request( + metric_families, + filter.as_ref(), + Timestamp::current_millis().value(), + ); + if let Err(e) = handler.write(request, ctx.clone()).await { + error!("report export metrics by handler failed, error {}", e); + } + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use crate::export_metrics::{ + ExportMetricsOption, ExportMetricsTask, RemoteWriteOption, SelfImportOption, + }; + + #[tokio::test] + async fn test_config() { + // zero write_interval + assert!(ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + write_interval: Duration::from_secs(0), + ..Default::default() + }, + None + ) + .is_err()); + // none self_import and remote_write + assert!(ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + ..Default::default() + }, + None + ) + .is_err()); + // both self_import and remote_write + assert!(ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + self_import: Some(SelfImportOption::default()), + remote_write: Some(RemoteWriteOption::default()), + ..Default::default() + }, + None + ) + .is_err()); + // empty db + assert!(ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + self_import: Some(SelfImportOption { db: "".to_string() }), + remote_write: None, + ..Default::default() + }, + None + ) + .is_err()); + // empty url + assert!(ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + self_import: None, + remote_write: Some(RemoteWriteOption { + url: "".to_string(), + ..Default::default() + }), + ..Default::default() + }, + None + ) + .is_err()); + // self import but no handle + let s = ExportMetricsTask::try_new( + &ExportMetricsOption { + enable: true, + self_import: Some(SelfImportOption::default()), + ..Default::default() + }, + None, + ) + .unwrap() + .unwrap(); + assert!(s.start(None).is_err()); + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9341ba5f09ce..42843ac22e81 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -756,11 +756,8 @@ tcp_nodelay = true [frontend.export_metrics] enable = false -db = "" write_interval = "30s" -[frontend.export_metrics.headers] - [datanode] mode = "standalone" node_id = 0 @@ -820,11 +817,8 @@ append_stdout = true [datanode.export_metrics] enable = false -db = "" write_interval = "30s" -[datanode.export_metrics.headers] - [logging] enable_otlp_tracing = false append_stdout = true