diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 53d3b8d1f3ea..7be645b36405 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -24,6 +24,7 @@ use axum::http::header::CONTENT_TYPE; use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; +use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; use pipeline::util::to_pipeline_version; @@ -40,6 +41,10 @@ use crate::error::{ use crate::http::greptime_manage_resp::GreptimedbManageResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; +use crate::metrics::{ + METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED, + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, +}; use crate::query_handler::LogHandlerRef; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -298,14 +303,27 @@ async fn ingest_logs_inner( pipeline_data: PipelineValue, query_ctx: QueryContextRef, ) -> Result { - let start = std::time::Instant::now(); + let db = query_ctx.get_db_string(); + let exec_timer = std::time::Instant::now(); let pipeline = state .get_pipeline(&pipeline_name, version, query_ctx.clone()) .await?; + + let transform_timer = std::time::Instant::now(); let transformed_data: Rows = pipeline .exec(pipeline_data) - .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .inspect(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + }) + .map_err(|reason| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + PipelineTransformSnafu { reason }.build() + }) .context(PipelineSnafu)?; let insert_request = RowInsertRequest { @@ -317,9 +335,26 @@ async fn ingest_logs_inner( }; let output = state.insert_logs(insert_requests, query_ctx).await; + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + METRIC_HTTP_LOGS_INGESTION_COUNTER + .with_label_values(&[db.as_str()]) + .inc_by(*rows as u64); + METRIC_HTTP_LOGS_INGESTION_ELAPSED + .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } else { + METRIC_HTTP_LOGS_INGESTION_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } + let response = GreptimedbV1Response::from_output(vec![output]) .await - .with_execution_time(start.elapsed().as_millis() as u64); + .with_execution_time(exec_timer.elapsed().as_millis() as u64); Ok(response) } diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index ac599df53565..b5924307253e 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -44,6 +44,10 @@ pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple"; pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended"; pub(crate) const METRIC_METHOD_LABEL: &str = "method"; pub(crate) const METRIC_PATH_LABEL: &str = "path"; +pub(crate) const METRIC_RESULT_LABEL: &str = "result"; + +pub(crate) const METRIC_SUCCESS_VALUE: &str = "success"; +pub(crate) const METRIC_FAILURE_VALUE: &str = "failure"; lazy_static! { pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!( @@ -130,6 +134,26 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( + "greptime_servers_http_logs_ingestion_counter", + "servers http logs ingestion counter", + &[METRIC_DB_LABEL] + ) + .unwrap(); + pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_logs_ingestion_elapsed", + "servers http logs ingestion elapsed", + &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] + ) + .unwrap(); + pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_logs_transform_elapsed", + "servers http logs transform elapsed", + &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] + ) + .unwrap(); pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!( "greptime_servers_mysql_connection_count", "servers mysql connection count"