diff --git a/config/frontend.example.toml b/config/frontend.example.toml index c8fe80cb14f7..ed4809ca84f7 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -57,6 +57,9 @@ enable = true # Prometheus remote storage options, see `standalone.example.toml`. [prom_store] enable = true +# Whether to store the data from Prometheus remote write in metric engine. +# true by default +with_metric_engine = true # Metasrv client options, see `datanode.example.toml`. [meta_client] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 8b1644b9e243..b30b0d58b78a 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -81,6 +81,9 @@ enable = true [prom_store] # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true +# Whether to store the data from Prometheus remote write in metric engine. +# true by default +with_metric_engine = true [wal] # Available wal providers: diff --git a/src/common/query/src/prelude.rs b/src/common/query/src/prelude.rs index b334edb9030c..de71ee107979 100644 --- a/src/common/query/src/prelude.rs +++ b/src/common/query/src/prelude.rs @@ -25,3 +25,5 @@ pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; pub const GREPTIME_VALUE: &str = "greptime_value"; /// Default counter column name for OTLP metrics. pub const GREPTIME_COUNT: &str = "greptime_count"; +/// Default physical table name +pub const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table"; diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 2ba67483d2dc..cd678b78601e 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -195,6 +195,18 @@ impl Instance { .context(TableOperationSnafu) } + pub async fn handle_metric_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + physical_table: String, + ) -> Result { + self.inserter + .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table) + .await + .context(TableOperationSnafu) + } + pub async fn handle_deletes( &self, requests: DeleteRequests, diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 8821ff9a4ffa..0f0833785061 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; +use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; @@ -27,6 +28,7 @@ use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::prom_store::{self, Metrics}; use servers::query_handler::{ PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse, @@ -153,18 +155,36 @@ impl Instance { #[async_trait] impl PromStoreProtocolHandler for Instance { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> { + async fn write( + &self, + request: WriteRequest, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> ServerResult<()> { self.plugins .get::() .as_ref() .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) .context(AuthSnafu)?; + let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?; - let _ = self - .handle_row_inserts(requests, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; + if with_metric_engine { + let physical_table = ctx + .extension(PHYSICAL_TABLE_PARAM) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + let _ = self + .handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } else { + let _ = self + .handle_row_inserts(requests, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(()) @@ -239,10 +259,20 @@ impl ExportMetricHandler { #[async_trait] impl PromStoreProtocolHandler for ExportMetricHandler { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> { + async fn write( + &self, + request: WriteRequest, + ctx: QueryContextRef, + _: bool, + ) -> ServerResult<()> { let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?; self.inserter - .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) + .handle_metric_row_inserts( + requests, + ctx, + &self.statement_executor, + GREPTIME_PHYSICAL_TABLE.to_string(), + ) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)?; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 9e9480556f92..5653905732cb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -136,6 +136,8 @@ impl Services { let _ = http_server_builder .with_prom_handler(instance.clone()) .with_prometheus_handler(instance.clone()); + http_server_builder + .set_prom_store_with_metric_engine(opts.prom_store.with_metric_engine); } if opts.otlp.enable { diff --git a/src/frontend/src/service_config/prom_store.rs b/src/frontend/src/service_config/prom_store.rs index 877ff42b78fe..b3adf889d263 100644 --- a/src/frontend/src/service_config/prom_store.rs +++ b/src/frontend/src/service_config/prom_store.rs @@ -17,11 +17,15 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct PromStoreOptions { pub enable: bool, + pub with_metric_engine: bool, } impl Default for PromStoreOptions { fn default() -> Self { - Self { enable: true } + Self { + enable: true, + with_metric_engine: true, + } } } @@ -33,5 +37,6 @@ mod tests { fn test_prom_store_options() { let default = PromStoreOptions::default(); assert!(default.enable); + assert!(default.with_metric_engine) } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 30b732a5d2d9..bffff45806c3 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -18,13 +18,15 @@ use std::sync::Arc; use api::v1::alter_expr::Kind; use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader}; use api::v1::{ - AlterExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests, + AlterExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, + RowInsertRequests, SemanticType, }; use catalog::CatalogManagerRef; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; use common_meta::peer::Peer; +use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info}; @@ -35,6 +37,9 @@ use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::prelude::*; use sql::statements::insert::Insert; +use store_api::metric_engine_consts::{ + LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, +}; use table::requests::InsertRequest as TableInsertRequest; use table::table_reference::TableReference; use table::TableRef; @@ -95,7 +100,7 @@ impl Inserter { }); validate_column_count_match(&requests)?; - self.create_or_alter_tables_on_demand(&requests, &ctx, statement_executor) + self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor) .await?; let inserts = RowToRegion::new( self.catalog_manager.as_ref(), @@ -109,6 +114,44 @@ impl Inserter { Ok(Output::AffectedRows(affected_rows as _)) } + /// Handle row inserts request with metric engine. + pub async fn handle_metric_row_inserts( + &self, + mut requests: RowInsertRequests, + ctx: QueryContextRef, + statement_executor: &StatementExecutor, + physical_table: String, + ) -> Result { + // remove empty requests + requests.inserts.retain(|req| { + req.rows + .as_ref() + .map(|r| !r.rows.is_empty()) + .unwrap_or_default() + }); + validate_column_count_match(&requests)?; + + // check and create physical table + self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor) + .await?; + + // check and create logical tables + self.create_or_alter_tables_on_demand( + &requests, + &ctx, + Some(physical_table.to_string()), + statement_executor, + ) + .await?; + let inserts = + RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx) + .convert(requests) + .await?; + + let affected_rows = self.do_request(inserts, &ctx).await?; + Ok(Output::AffectedRows(affected_rows as _)) + } + pub async fn handle_table_insert( &self, request: TableInsertRequest, @@ -206,9 +249,10 @@ impl Inserter { &self, requests: &RowInsertRequests, ctx: &QueryContextRef, + on_physical_table: Option, statement_executor: &StatementExecutor, ) -> Result<()> { - // TODO(jeremy): create and alter in batch? + // TODO(jeremy): create and alter in batch? (from `handle_metric_row_inserts`) for req in &requests.inserts { let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); @@ -219,13 +263,76 @@ impl Inserter { self.alter_table_on_demand(req, table, ctx, statement_executor) .await? } - None => self.create_table(req, ctx, statement_executor).await?, + None => { + self.create_table(req, ctx, &on_physical_table, statement_executor) + .await? + } } } Ok(()) } + async fn create_physical_table_on_demand( + &self, + ctx: &QueryContextRef, + physical_table: String, + statement_executor: &StatementExecutor, + ) -> Result<()> { + let catalog_name = ctx.current_catalog(); + let schema_name = ctx.current_schema(); + + // check if exist + if self + .get_table(catalog_name, schema_name, &physical_table) + .await? + .is_some() + { + return Ok(()); + } + + let table_reference = TableReference::full(catalog_name, schema_name, &physical_table); + info!("Physical metric table `{table_reference}` does not exist, try creating table"); + + // schema with timestamp and field column + let default_schema = vec![ + ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + }, + ColumnSchema { + column_name: GREPTIME_VALUE.to_string(), + datatype: ColumnDataType::Float64 as _, + semantic_type: SemanticType::Field as _, + datatype_extension: None, + }, + ]; + let create_table_expr = &mut build_create_table_expr(&table_reference, &default_schema)?; + + create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); + create_table_expr + .table_options + .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string()); + + // create physical table + let res = statement_executor + .create_table_inner(create_table_expr, None) + .await; + + match res { + Ok(_) => { + info!("Successfully created table {table_reference}",); + Ok(()) + } + Err(err) => { + error!("Failed to create table {table_reference}: {err}",); + Err(err) + } + } + } + async fn get_table( &self, catalog: &str, @@ -289,10 +396,14 @@ impl Inserter { } } + /// Create a table with schema from insert request. + /// + /// To create a metric engine logical table, specify the `on_physical_table` parameter. async fn create_table( &self, req: &RowInsertRequest, ctx: &QueryContextRef, + on_physical_table: &Option, statement_executor: &StatementExecutor, ) -> Result<()> { let table_ref = @@ -301,10 +412,15 @@ impl Inserter { let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; - info!( - "Table {}.{}.{} does not exist, try create table", - table_ref.catalog, table_ref.schema, table_ref.table, - ); + if let Some(physical_table) = on_physical_table { + create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); + create_table_expr.table_options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_table.clone(), + ); + } + + info!("Table `{table_ref}` does not exist, try creating table",); // TODO(weny): multiple regions table. let res = statement_executor diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index cbbe34b81553..a19d39fe3c06 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -430,6 +430,11 @@ pub enum Error { #[snafu(display("Missing query context"))] MissingQueryContext { location: Location }, + + #[snafu(display( + "Invalid parameter, physical_table is not expected when metric engine is disabled" + ))] + UnexpectedPhysicalTable { location: Location }, } pub type Result = std::result::Result; @@ -488,7 +493,8 @@ impl ErrorExt for Error { | UrlDecode { .. } | IncompatibleSchema { .. } | MissingQueryContext { .. } - | MysqlValueConversion { .. } => StatusCode::InvalidArguments, + | MysqlValueConversion { .. } + | UnexpectedPhysicalTable { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 12ca611aec12..014f58e52678 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -256,7 +256,7 @@ pub async fn write_system_metric_by_handler( filter.as_ref(), Timestamp::current_millis().value(), ); - if let Err(e) = handler.write(request, ctx.clone()).await { + if let Err(e) = handler.write(request, ctx.clone(), false).await { error!("report export metrics by handler failed, error {}", e); } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 277e105b602d..e114425e522f 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -99,9 +99,9 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"] #[derive(Default)] pub struct HttpServer { + // server handlers sql_handler: Option, grpc_handler: Option, - options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, prom_handler: Option, @@ -111,8 +111,14 @@ pub struct HttpServer { shutdown_tx: Mutex>>, user_provider: Option, metrics_handler: Option, - greptime_config_options: Option, + + // plugins plugins: Plugins, + + // server configs + options: HttpOptions, + greptime_config_options: Option, + prom_store_with_metric_engine: bool, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -421,6 +427,7 @@ impl HttpServerBuilder { shutdown_tx: Mutex::new(None), greptime_config_options: None, plugins: Default::default(), + prom_store_with_metric_engine: false, }, } } @@ -485,6 +492,11 @@ impl HttpServerBuilder { self } + pub fn set_prom_store_with_metric_engine(&mut self, with_metric_engine: bool) -> &mut Self { + self.inner.prom_store_with_metric_engine = with_metric_engine; + self + } + pub fn build(&mut self) -> HttpServer { std::mem::take(self).inner } @@ -674,10 +686,16 @@ impl HttpServer { } fn route_prom(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router { - Router::new() - .route("/write", routing::post(prom_store::remote_write)) - .route("/read", routing::post(prom_store::remote_read)) - .with_state(prom_handler) + let mut router = Router::new().route("/read", routing::post(prom_store::remote_read)); + if self.prom_store_with_metric_engine { + router = router.route("/write", routing::post(prom_store::remote_write)); + } else { + router = router.route( + "/write", + routing::post(prom_store::route_write_without_metric_engine), + ); + } + router.with_state(prom_handler) } fn route_influxdb(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router { diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index c3da6e8e0d8f..1eabd3245868 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -60,12 +60,14 @@ pub async fn inner_auth( ) -> std::result::Result, Response> { // 1. prepare let (catalog, schema) = extract_catalog_and_schema(&req); - let timezone = extract_timezone(&req); - let query_ctx = QueryContextBuilder::default() + // TODO(ruihang): move this out of auth module + let timezone = Arc::new(extract_timezone(&req)); + let query_ctx_builder = QueryContextBuilder::default() .current_catalog(catalog.to_string()) .current_schema(schema.to_string()) - .timezone(Arc::new(timezone)) - .build(); + .timezone(timezone); + + let query_ctx = query_ctx_builder.build(); let need_auth = need_auth(&req); let is_influxdb = req.uri().path().contains("influxdb"); diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index b994da945046..7be168d54598 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -17,7 +17,10 @@ use headers::{Header, HeaderName, HeaderValue}; pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format"; pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time"; +/// Header key of `db-name`. Example format of the header value is `greptime-public`. pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name"); +/// Header key of query specific timezone. +/// Example format of the header value is `Asia/Shanghai` or `+08:00`. pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-timezone"); diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index af1f2a5261da..2ca07c68ca9e 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::prom_store::remote::{ReadRequest, WriteRequest}; use axum::extract::{Query, RawBody, State}; use axum::http::{header, StatusCode}; use axum::response::IntoResponse; use axum::Extension; use common_catalog::consts::DEFAULT_SCHEMA_NAME; +use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use hyper::Body; use prost::Message; use schemars::JsonSchema; @@ -25,25 +28,32 @@ use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; use snafu::prelude::*; -use crate::error::{self, Result}; +use crate::error::{self, Result, UnexpectedPhysicalTableSnafu}; use crate::prom_store::snappy_decompress; use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse}; +pub const PHYSICAL_TABLE_PARAM: &str = "physical_table"; + #[derive(Debug, Serialize, Deserialize, JsonSchema)] pub struct DatabaseQuery { pub db: Option, + /// Specify which physical table to use for storing metrics. + /// This only works on remote write requests. + pub physical_table: Option, } impl Default for DatabaseQuery { fn default() -> DatabaseQuery { Self { db: Some(DEFAULT_SCHEMA_NAME.to_string()), + physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()), } } } +/// Same with [remote_write] but won't store data to metric engine. #[axum_macros::debug_handler] -pub async fn remote_write( +pub async fn route_write_without_metric_engine( State(handler): State, Query(params): Query, Extension(query_ctx): Extension, @@ -51,12 +61,39 @@ pub async fn remote_write( ) -> Result<(StatusCode, ())> { let request = decode_remote_write_request(body).await?; let db = params.db.clone().unwrap_or_default(); + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + + // reject if physical table is specified when metric engine is disabled + if params.physical_table.is_some() { + return UnexpectedPhysicalTableSnafu {}.fail(); + } + + handler.write(request, query_ctx, false).await?; + Ok((StatusCode::NO_CONTENT, ())) +} + +#[axum_macros::debug_handler] +pub async fn remote_write( + State(handler): State, + Query(params): Query, + Extension(mut query_ctx): Extension, + RawBody(body): RawBody, +) -> Result<(StatusCode, ())> { + let request = decode_remote_write_request(body).await?; + let db = params.db.clone().unwrap_or_default(); + if let Some(physical_table) = params.physical_table { + let mut new_query_ctx = query_ctx.as_ref().clone(); + new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table); + query_ctx = Arc::new(new_query_ctx); + } let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - handler.write(request, query_ctx).await?; + handler.write(request, query_ctx, true).await?; Ok((StatusCode::NO_CONTENT, ())) } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 06ef6bbda5c0..d36b7418b022 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -89,7 +89,12 @@ pub struct PromStoreResponse { #[async_trait] pub trait PromStoreProtocolHandler { /// Handling prometheus remote write requests - async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()>; + async fn write( + &self, + request: WriteRequest, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> Result<()>; /// Handling prometheus remote read requests async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result; /// Handling push gateway requests diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index ea9505936fc2..9b5fe6a37efa 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -56,7 +56,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl PromStoreProtocolHandler for DummyInstance { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> { + async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<()> { let _ = self .tx .send((ctx.current_schema().to_owned(), request.encode_to_vec())) diff --git a/src/session/src/context.rs b/src/session/src/context.rs index b87e4caac559..53cc2db7441a 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::sync::Arc; @@ -40,7 +41,9 @@ pub struct QueryContext { current_user: ArcSwap>, #[builder(setter(custom))] timezone: ArcSwap, - sql_dialect: Box, + sql_dialect: Arc, + #[builder(default)] + extension: HashMap, } impl QueryContextBuilder { @@ -61,6 +64,19 @@ impl Display for QueryContext { } } +impl Clone for QueryContext { + fn clone(&self) -> Self { + Self { + current_catalog: self.current_catalog.clone(), + current_schema: self.current_schema.clone(), + current_user: self.current_user.load().clone().into(), + timezone: self.timezone.load().clone().into(), + sql_dialect: self.sql_dialect.clone(), + extension: self.extension.clone(), + } + } +} + impl From<&RegionRequestHeader> for QueryContext { fn from(value: &RegionRequestHeader) -> Self { let (catalog, schema) = parse_catalog_and_schema_from_db_string(&value.dbname); @@ -70,7 +86,8 @@ impl From<&RegionRequestHeader> for QueryContext { current_user: Default::default(), // for request send to datanode, all timestamp have converted to UTC, so timezone is not important timezone: ArcSwap::new(Arc::new(get_timezone(None).clone())), - sql_dialect: Box::new(GreptimeDbDialect {}), + sql_dialect: Arc::new(GreptimeDbDialect {}), + extension: Default::default(), } } } @@ -139,6 +156,14 @@ impl QueryContext { let _ = self.timezone.swap(Arc::new(timezone)); } + pub fn set_extension, S2: Into>(&mut self, key: S1, value: S2) { + self.extension.insert(key.into(), value.into()); + } + + pub fn extension>(&self, key: S) -> Option<&str> { + self.extension.get(key.as_ref()).map(|v| v.as_str()) + } + /// SQL like `set variable` may change timezone or other info in `QueryContext`. /// We need persist these change in `Session`. pub fn update_session(&self, session: &SessionRef) { @@ -166,9 +191,17 @@ impl QueryContextBuilder { .unwrap_or(ArcSwap::new(Arc::new(get_timezone(None).clone()))), sql_dialect: self .sql_dialect - .unwrap_or_else(|| Box::new(GreptimeDbDialect {})), + .unwrap_or_else(|| Arc::new(GreptimeDbDialect {})), + extension: self.extension.unwrap_or_default(), }) } + + pub fn set_extension(mut self, key: String, value: String) -> Self { + self.extension + .get_or_insert_with(HashMap::new) + .insert(key, value); + self + } } #[derive(Debug)] @@ -207,10 +240,10 @@ pub enum Channel { } impl Channel { - pub fn dialect(&self) -> Box { + pub fn dialect(&self) -> Arc { match self { - Channel::Mysql => Box::new(MySqlDialect {}), - Channel::Postgres => Box::new(PostgreSqlDialect {}), + Channel::Mysql => Arc::new(MySqlDialect {}), + Channel::Postgres => Arc::new(PostgreSqlDialect {}), } } } diff --git a/tests-integration/src/prom_store.rs b/tests-integration/src/prom_store.rs index 83cdba88e21b..4c0c7a4a530f 100644 --- a/tests-integration/src/prom_store.rs +++ b/tests-integration/src/prom_store.rs @@ -23,6 +23,7 @@ mod tests { use common_catalog::consts::DEFAULT_CATALOG_NAME; use frontend::instance::Instance; use prost::Message; + use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::prom_store; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::PromStoreProtocolHandler; @@ -32,30 +33,69 @@ mod tests { use crate::tests; #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_prom_store_remote_rw() { - let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_prom_store_remote_rw") - .build() - .await; + async fn test_standalone_prom_store_remote_rw_default_physical_table() { + common_telemetry::init_default_ut_logging(); + let standalone = GreptimeDbStandaloneBuilder::new( + "test_standalone_prom_store_remote_rw_default_physical_table", + ) + .build() + .await; let instance = &standalone.instance; - test_prom_store_remote_rw(instance).await; + test_prom_store_remote_rw(instance, None).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_prom_store_remote_rw_default_physical_table() { + common_telemetry::init_default_ut_logging(); + let distributed = tests::create_distributed_instance( + "test_distributed_prom_store_remote_rw_default_physical_table", + ) + .await; + test_prom_store_remote_rw(&distributed.frontend(), None).await; } #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_prom_store_remote_rw() { - let distributed = - tests::create_distributed_instance("test_distributed_prom_store_remote_rw").await; - test_prom_store_remote_rw(&distributed.frontend()).await; + async fn test_standalone_prom_store_remote_rw_custom_physical_table() { + common_telemetry::init_default_ut_logging(); + let standalone = GreptimeDbStandaloneBuilder::new( + "test_standalone_prom_store_remote_rw_custom_physical_table", + ) + .build() + .await; + let instance = &standalone.instance; + + test_prom_store_remote_rw(instance, Some("my_custom_physical_table".to_string())).await; } - async fn test_prom_store_remote_rw(instance: &Arc) { + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_prom_store_remote_rw_custom_physical_table() { + common_telemetry::init_default_ut_logging(); + let distributed = tests::create_distributed_instance( + "test_distributed_prom_store_remote_rw_custom_physical_table", + ) + .await; + test_prom_store_remote_rw( + &distributed.frontend(), + Some("my_custom_physical_table".to_string()), + ) + .await; + } + + async fn test_prom_store_remote_rw(instance: &Arc, physical_table: Option) { let write_request = WriteRequest { timeseries: prom_store::mock_timeseries(), ..Default::default() }; let db = "prometheus"; - let ctx = QueryContext::with(DEFAULT_CATALOG_NAME, db); + let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db)).unwrap(); + + // set physical table if provided + if let Some(physical_table) = &physical_table { + ctx.set_extension(PHYSICAL_TABLE_PARAM.to_string(), physical_table.clone()); + } + let ctx = Arc::new(ctx); assert!(SqlQueryHandler::do_query( instance.as_ref(), @@ -67,7 +107,10 @@ mod tests { .unwrap() .is_ok()); - instance.write(write_request, ctx.clone()).await.unwrap(); + instance + .write(write_request, ctx.clone(), true) + .await + .unwrap(); let read_request = ReadRequest { queries: vec![ @@ -102,7 +145,7 @@ mod tests { ..Default::default() }; - let resp = instance.read(read_request, ctx).await.unwrap(); + let resp = instance.read(read_request, ctx.clone()).await.unwrap(); assert_eq!(resp.content_type, "application/x-protobuf"); assert_eq!(resp.content_encoding, "snappy"); let body = prom_store::snappy_decompress(&resp.body).unwrap(); @@ -179,5 +222,11 @@ mod tests { } ] ); + + // check physical table if provided + if let Some(physical_table) = physical_table { + let sql = format!("DESC TABLE {physical_table};"); + instance.do_query(&sql, ctx).await[0].as_ref().unwrap(); + } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7182c72c19b9..397865dedfe9 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -706,6 +706,7 @@ enable = true [frontend.prom_store] enable = true +with_metric_engine = true [frontend.otlp] enable = true