Skip to content

Commit

Permalink
fix: Optimize export metric behavior (#3047)
Browse files Browse the repository at this point in the history
* fix: optimze export metric bahavior

* chor: fix ci

* chore: update config format

* chore: fix format
  • Loading branch information
Taylor-lagrange authored Jan 4, 2024
1 parent ec43b91 commit 4d250ed
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 128 deletions.
7 changes: 3 additions & 4 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,10 @@ parallel_scan_channel_size = 32
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# [export_metrics.remote_write]
# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`
# url = ""
# HTTP headers of Prometheus remote-write carry
# headers = {}
9 changes: 3 additions & 6 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,8 @@ tcp_nodelay = true
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}
# for `frontend`, `self_import` is recommend to collect metrics generated by itself
# [export_metrics.self_import]
# db = "information_schema"
7 changes: 3 additions & 4 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ provider = "raft_engine"
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# [export_metrics.remote_write]
# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`
# url = ""
# HTTP headers of Prometheus remote-write carry
# headers = {}
9 changes: 3 additions & 6 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,8 @@ parallel_scan_channel_size = 32
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}
# for `standalone`, `self_import` is recommend to collect metrics generated by itself
# [export_metrics.self_import]
# db = "information_schema"
4 changes: 0 additions & 4 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

instance
.build_export_metrics_task(&opts.export_metrics)
.context(StartFrontendSnafu)?;

instance
.build_servers(opts)
.await
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

frontend
.build_export_metrics_task(&opts.frontend.export_metrics)
.context(StartFrontendSnafu)?;

frontend
.build_servers(opts)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Datanode {
self.start_telemetry();

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None).context(StartServerSnafu)?
}

self.start_services().await
Expand Down
23 changes: 15 additions & 8 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::export_metrics::ExportMetricsTask;
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
Expand All @@ -76,6 +76,7 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;

use self::prom_store::ExportMetricHandler;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
Expand Down Expand Up @@ -190,18 +191,16 @@ impl Instance {
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
) -> Result<()> {
let opts: FrontendOptions = opts.into();
self.export_metrics_task =
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;
let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?;
self.servers = Arc::new(servers);

Ok(())
}

pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> {
self.export_metrics_task =
ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?;
Ok(())
}

pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
Expand Down Expand Up @@ -232,7 +231,15 @@ impl FrontendInstance for Instance {
self.script_executor.start(self)?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
if t.send_by_handler {
let handler = ExportMetricHandler::new_handler(
self.inserter.clone(),
self.statement_executor.clone(),
);
t.start(Some(handler)).context(StartServerSnafu)?
} else {
t.start(None).context(StartServerSnafu)?;
}
}

futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
Expand Down
54 changes: 53 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
Expand All @@ -21,10 +23,14 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -209,3 +215,49 @@ impl PromStoreProtocolHandler for Instance {
todo!();
}
}

/// This handler is mainly used for `frontend` or `standalone` to directly import
/// the metrics collected by itself, thereby avoiding importing metrics through the network,
/// thus reducing compression and network transmission overhead,
/// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler {
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
}

impl ExportMetricHandler {
pub fn new_handler(
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
}
}

#[async_trait]
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Ok(())
}

async fn read(
&self,
_request: ReadRequest,
_ctx: QueryContextRef,
) -> ServerResult<PromStoreResponse> {
unreachable!();
}

async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
unreachable!();
}
}
2 changes: 1 addition & 1 deletion src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl MetaSrvInstance {
self.meta_srv.try_start().await?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None).context(InitExportMetricsTaskSnafu)?
}

let (tx, rx) = mpsc::channel::<()>(1);
Expand Down
Loading

0 comments on commit 4d250ed

Please sign in to comment.