diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 1dcddcd126e3..65b14c2d7bb5 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -135,6 +135,13 @@ pub enum Error { operator: String, location: Location, }, + + #[snafu(display("Matcher operator {matcher_op} is not supported for {matcher}"))] + UnsupportedMatcherOp { + matcher_op: String, + matcher: String, + location: Location, + }, } impl ErrorExt for Error { @@ -157,6 +164,7 @@ impl ErrorExt for Error { | DataFusionPlanning { .. } | MultiFieldsNotSupported { .. } | UnexpectedPlanExpr { .. } + | UnsupportedMatcherOp { .. } | IllegalRange { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | EmptyRange { .. } => StatusCode::Internal, diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index a2d0c2d79a63..6cd0e17828fe 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -49,8 +49,8 @@ use crate::error::{ ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, - UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, - ZeroRangeSelectorSnafu, + UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, + UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, @@ -79,6 +79,9 @@ const DEFAULT_FIELD_COLUMN: &str = "value"; /// Special modifier to project field columns under multi-field mode const FIELD_COLUMN_MATCHER: &str = "__field__"; +/// Special modifier for cross schema query +const SCHEMA_COLUMN_MATCHER: &str = "__schema__"; + #[derive(Default, Debug, Clone)] struct PromPlannerContext { // query parameters @@ -93,6 +96,7 @@ struct PromPlannerContext { field_columns: Vec, tag_columns: Vec, field_column_matcher: Option>, + schema_name: Option, /// The range in millisecond of range selector. None if there is no range selector. range: Option, } @@ -115,9 +119,16 @@ impl PromPlannerContext { self.field_columns = vec![]; self.tag_columns = vec![]; self.field_column_matcher = None; + self.schema_name = None; self.range = None; } + /// Reset table name and schema to empty + fn reset_table_name_and_schema(&mut self) { + self.table_name = Some(String::new()); + self.schema_name = None; + } + /// Check if `le` is present in tag columns fn has_le_tag(&self) -> bool { self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME)) @@ -205,7 +216,7 @@ impl PromPlanner { (Some(lhs), Some(rhs)) => { self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.table_name = Some(String::new()); + self.ctx.reset_table_name_and_schema(); let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; let mut field_expr = field_expr_builder(lhs, rhs)?; @@ -292,16 +303,16 @@ impl PromPlanner { (None, None) => { let left_input = self.prom_expr_to_plan(*lhs.clone()).await?; let left_field_columns = self.ctx.field_columns.clone(); - let mut left_table_ref = OwnedTableReference::bare( - self.ctx.table_name.clone().unwrap_or_default(), - ); + let mut left_table_ref = self + .table_ref() + .unwrap_or_else(|_| OwnedTableReference::bare("")); let left_context = self.ctx.clone(); let right_input = self.prom_expr_to_plan(*rhs.clone()).await?; let right_field_columns = self.ctx.field_columns.clone(); - let mut right_table_ref = OwnedTableReference::bare( - self.ctx.table_name.clone().unwrap_or_default(), - ); + let mut right_table_ref = self + .table_ref() + .unwrap_or_else(|_| OwnedTableReference::bare("")); let right_context = self.ctx.clone(); // TODO(ruihang): avoid join if left and right are the same table @@ -375,7 +386,7 @@ impl PromPlanner { PromExpr::NumberLiteral(NumberLiteral { val }) => { self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.table_name = Some(String::new()); + self.ctx.reset_table_name_and_schema(); let literal_expr = df_prelude::lit(*val); LogicalPlan::Extension(Extension { @@ -395,7 +406,7 @@ impl PromPlanner { PromExpr::StringLiteral(StringLiteral { val }) => { self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.table_name = Some(String::new()); + self.ctx.reset_table_name_and_schema(); let literal_expr = df_prelude::lit(val.to_string()); LogicalPlan::Extension(Extension { @@ -489,7 +500,7 @@ impl PromPlanner { self.prom_expr_to_plan(prom_expr).await? } else { self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); - self.ctx.table_name = Some(String::new()); + self.ctx.reset_table_name_and_schema(); LogicalPlan::Extension(Extension { node: Arc::new( EmptyMetric::new( @@ -595,6 +606,15 @@ impl PromPlanner { .field_column_matcher .get_or_insert_default() .push(matcher.clone()); + } else if matcher.name == SCHEMA_COLUMN_MATCHER { + ensure!( + matcher.op == MatchOp::Equal, + UnsupportedMatcherOpSnafu { + matcher: matcher.name.to_string(), + matcher_op: matcher.op.to_string(), + } + ); + self.ctx.schema_name = Some(matcher.value.clone()); } else if matcher.name != METRIC_NAME { let _ = matchers.insert(matcher.clone()); } @@ -609,8 +629,6 @@ impl PromPlanner { label_matchers: Matchers, is_range_selector: bool, ) -> Result { - let table_name = self.ctx.table_name.clone().unwrap(); - // make filter exprs let offset_duration = match offset { Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, @@ -633,8 +651,9 @@ impl PromPlanner { ))); // make table scan with filter exprs + let table_ref = self.table_ref()?; let mut table_scan = self - .create_table_scan_plan(&table_name, scan_filters.clone()) + .create_table_scan_plan(table_ref.clone(), scan_filters.clone()) .await?; // make a projection plan if there is any `__field__` matcher @@ -730,7 +749,9 @@ impl PromPlanner { self.ctx .time_index_column .clone() - .with_context(|| TimeIndexNotFoundSnafu { table: table_name })?, + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })?, is_range_selector, divide_plan, ); @@ -838,16 +859,32 @@ impl PromPlanner { Ok(exprs) } + fn table_ref(&self) -> Result { + let table_name = self + .ctx + .table_name + .clone() + .context(TableNameNotFoundSnafu)?; + + // set schema name if `__schema__` is given + let table_ref = if let Some(schema_name) = &self.ctx.schema_name { + TableReference::partial(schema_name, &table_name) + } else { + TableReference::bare(&table_name) + }; + + Ok(table_ref.to_owned_reference()) + } + /// Create a table scan plan and a filter plan with given filter. /// /// # Panic /// If the filter is empty async fn create_table_scan_plan( &mut self, - table_name: &str, + table_ref: OwnedTableReference, filter: Vec, ) -> Result { - let table_ref = OwnedTableReference::bare(table_name.to_string()); let provider = self .table_provider .resolve_table(table_ref.clone()) @@ -865,14 +902,10 @@ impl PromPlanner { /// Setup [PromPlannerContext]'s state fields. async fn setup_context(&mut self) -> Result<()> { - let table_name = self - .ctx - .table_name - .clone() - .context(TableNameNotFoundSnafu)?; + let table_ref = self.table_ref()?; let table = self .table_provider - .resolve_table(TableReference::bare(&table_name)) + .resolve_table(table_ref.clone()) .await .context(CatalogSnafu)? .as_any() @@ -888,7 +921,9 @@ impl PromPlanner { let time_index = table .schema() .timestamp_column() - .with_context(|| TimeIndexNotFoundSnafu { table: table_name })? + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })? .name .clone(); self.ctx.time_index_column = Some(time_index); @@ -1224,7 +1259,7 @@ impl PromPlanner { } utils::conjunction(exprs).context(ValueNotFoundSnafu { - table: self.ctx.table_name.clone().unwrap(), + table: self.table_ref()?.to_quoted_string(), }) } @@ -1354,7 +1389,7 @@ impl PromPlanner { // reuse `SPECIAL_TIME_FUNCTION` as name of time index column self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); - self.ctx.table_name = Some(String::new()); + self.ctx.reset_table_name_and_schema(); self.ctx.tag_columns = vec![]; self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()]; Ok(LogicalPlan::Extension(Extension { @@ -2014,57 +2049,61 @@ mod test { use super::*; async fn build_test_table_provider( - table_name: String, + table_name_tuples: &[(String, String)], num_tag: usize, num_field: usize, ) -> DfTableSourceProvider { - let mut columns = vec![]; - for i in 0..num_tag { - columns.push(ColumnSchema::new( - format!("tag_{i}"), - ConcreteDataType::string_datatype(), - false, - )); - } - columns.push( - ColumnSchema::new( - "timestamp".to_string(), - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ); - for i in 0..num_field { - columns.push(ColumnSchema::new( - format!("field_{i}"), - ConcreteDataType::float64_datatype(), - true, - )); - } - let schema = Arc::new(Schema::new(columns)); - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices((0..num_tag).collect()) - .value_indices((num_tag + 1..num_tag + 1 + num_field).collect()) - .next_column_id(1024) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .name(&table_name) - .meta(table_meta) - .build() - .unwrap(); - let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); - assert!(catalog_list - .register_table_sync(RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name, - table_id: 1024, - table, - }) - .is_ok()); + for (schema_name, table_name) in table_name_tuples { + let mut columns = vec![]; + for i in 0..num_tag { + columns.push(ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + false, + )); + } + columns.push( + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + for i in 0..num_field { + columns.push(ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + true, + )); + } + let schema = Arc::new(Schema::new(columns)); + let table_meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices((0..num_tag).collect()) + .value_indices((num_tag + 1..num_tag + 1 + num_field).collect()) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(table_name.to_string()) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + assert!(catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: schema_name.to_string(), + table_name: table_name.to_string(), + table_id: 1024, + table, + }) + .is_ok()); + } + DfTableSourceProvider::new(catalog_list, false, QueryContext::arc().as_ref()) } @@ -2096,7 +2135,12 @@ mod test { lookback_delta: Duration::from_secs(1), }; - let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt) .await .unwrap(); @@ -2301,7 +2345,12 @@ mod test { }; // test group by - let table_provider = build_test_table_provider("some_metric".to_string(), 2, 2).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 2, + 2, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()) .await .unwrap(); @@ -2326,7 +2375,12 @@ mod test { labels: vec![String::from("tag_1")].into_iter().collect(), })); } - let table_provider = build_test_table_provider("some_metric".to_string(), 2, 2).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 2, + 2, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt) .await .unwrap(); @@ -2446,7 +2500,12 @@ mod test { lookback_delta: Duration::from_secs(1), }; - let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt) .await .unwrap(); @@ -2485,7 +2544,18 @@ mod test { lookback_delta: Duration::from_secs(1), }; - let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await; + let table_provider = build_test_table_provider( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + "greptime_private".to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt) .await .unwrap(); @@ -2724,7 +2794,12 @@ mod test { for case in cases { let prom_expr = parser::parse(case.0).unwrap(); eval_stmt.expr = prom_expr; - let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 3, + 3, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()) .await .unwrap(); @@ -2743,9 +2818,66 @@ mod test { for case in bad_cases { let prom_expr = parser::parse(case).unwrap(); eval_stmt.expr = prom_expr; - let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 3, + 3, + ) + .await; let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()).await; assert!(plan.is_err(), "case: {:?}", case); } } + + #[tokio::test] + async fn custom_schema() { + let query = "some_alt_metric{__schema__=\"greptime_private\"}"; + let expected = String::from( + "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + indie_query_plan_compare(query, expected).await; + + let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric"; + let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"); + + indie_query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn only_equals_is_supported_for_special_matcher() { + let queries = &[ + "some_alt_metric{__schema__!=\"greptime_private\"}", + "some_alt_metric{__schema__=~\"lalala\"}", + ]; + + for query in queries { + let prom_expr = parser::parse(query).unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + "greptime_private".to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + + let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt).await; + assert!(plan.is_err(), "query: {:?}", query); + } + } } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index ab1e468dc6e3..181d46d87f81 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -110,7 +110,7 @@ impl QueryContext { .build() } - pub fn with_db_name(db_name: Option<&String>) -> QueryContextRef { + pub fn with_db_name(db_name: Option<&str>) -> QueryContextRef { let (catalog, schema) = db_name .map(|db| { let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index 19886a4a851e..8810f5d94664 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use common_query::Output; use frontend::instance::Instance; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use rstest::rstest; @@ -27,6 +28,35 @@ use super::test_util::{ }; use crate::tests::test_util::MockInstance; +#[allow(clippy::too_many_arguments)] +async fn promql_query( + ins: Arc, + promql: &str, + query_ctx: Arc, + start: SystemTime, + end: SystemTime, + interval: Duration, + lookback: Duration, +) -> operator::error::Result { + let query = PromQuery { + query: promql.to_string(), + ..PromQuery::default() + }; + let QueryStatement::Promql(mut eval_stmt) = + QueryLanguageParser::parse_promql(&query, &query_ctx).unwrap() + else { + unreachable!() + }; + eval_stmt.start = start; + eval_stmt.end = end; + eval_stmt.interval = interval; + eval_stmt.lookback_delta = lookback; + + ins.statement_executor() + .execute_stmt(QueryStatement::Promql(eval_stmt), query_ctx) + .await +} + #[allow(clippy::too_many_arguments)] async fn create_insert_query_assert( instance: Arc, @@ -54,25 +84,17 @@ async fn create_insert_query_assert( let _ = v.unwrap(); }); - let query = PromQuery { - query: promql.to_string(), - ..PromQuery::default() - }; - let QueryStatement::Promql(mut eval_stmt) = - QueryLanguageParser::parse_promql(&query, &QueryContext::arc()).unwrap() - else { - unreachable!() - }; - eval_stmt.start = start; - eval_stmt.end = end; - eval_stmt.interval = interval; - eval_stmt.lookback_delta = lookback; - - let query_output = instance - .statement_executor() - .execute_stmt(QueryStatement::Promql(eval_stmt), QueryContext::arc()) - .await - .unwrap(); + let query_output = promql_query( + instance, + promql, + QueryContext::arc(), + start, + end, + interval, + lookback, + ) + .await + .unwrap(); check_unordered_output_stream(query_output, expected).await; } @@ -524,3 +546,85 @@ async fn binary_op_plain_columns(instance: Arc) { ) .await; } + +#[apply(both_instances_cases)] +async fn cross_schema_query(instance: Arc) { + let ins = instance.frontend(); + + ins.do_query( + AGGREGATORS_CREATE_TABLE, + QueryContext::with_db_name(Some("greptime_private")), + ) + .await + .into_iter() + .for_each(|v| { + let _ = v.unwrap(); + }); + ins.do_query( + AGGREGATORS_INSERT_DATA, + QueryContext::with_db_name(Some("greptime_private")), + ) + .await + .into_iter() + .for_each(|v| { + let _ = v.unwrap(); + }); + + let start = UNIX_EPOCH; + let end = unix_epoch_plus_100s(); + let interval = Duration::from_secs(60); + let lookback_delta = Duration::from_secs(0); + + let query_output = promql_query( + ins.clone(), + r#"http_requests{__schema__="greptime_private"}"#, + QueryContext::arc(), + start, + end, + interval, + lookback_delta, + ) + .await + .unwrap(); + + let expected = r#"+------------+----------+------------+-------+---------------------+ +| job | instance | group | value | ts | ++------------+----------+------------+-------+---------------------+ +| api-server | 0 | production | 100.0 | 1970-01-01T00:00:00 | +| api-server | 0 | canary | 300.0 | 1970-01-01T00:00:00 | +| api-server | 1 | production | 200.0 | 1970-01-01T00:00:00 | +| api-server | 1 | canary | 400.0 | 1970-01-01T00:00:00 | +| app-server | 0 | canary | 700.0 | 1970-01-01T00:00:00 | +| app-server | 0 | production | 500.0 | 1970-01-01T00:00:00 | +| app-server | 1 | canary | 800.0 | 1970-01-01T00:00:00 | +| app-server | 1 | production | 600.0 | 1970-01-01T00:00:00 | ++------------+----------+------------+-------+---------------------+"#; + + check_unordered_output_stream(query_output, expected).await; + + let query_output = promql_query( + ins.clone(), + r#"http_requests"#, + QueryContext::arc(), + start, + end, + interval, + lookback_delta, + ) + .await; + assert!(query_output.is_err()); + + let query_output = promql_query( + ins.clone(), + r#"http_requests"#, + QueryContext::with_db_name(Some("greptime_private")), + start, + end, + interval, + lookback_delta, + ) + .await + .unwrap(); + + check_unordered_output_stream(query_output, expected).await; +} diff --git a/tests/cases/standalone/common/tql/basic.result b/tests/cases/standalone/common/tql/basic.result index f679dcb341d1..a0c244a48b31 100644 --- a/tests/cases/standalone/common/tql/basic.result +++ b/tests/cases/standalone/common/tql/basic.result @@ -32,6 +32,23 @@ TQL EVAL (0, 10, '5s') {__name__="test"}; | 2.0 | 1970-01-01T00:00:10 | a | +-----+---------------------+---+ +-- SQLNESS SORT_RESULT 2 1 +TQL EVAL (0, 10, '5s') test{__schema__="public"}; + ++-----+---------------------+---+ +| i | j | k | ++-----+---------------------+---+ +| 1.0 | 1970-01-01T00:00:05 | b | +| 1.0 | 1970-01-01T00:00:10 | b | +| 2.0 | 1970-01-01T00:00:05 | a | +| 2.0 | 1970-01-01T00:00:10 | a | ++-----+---------------------+---+ + +-- SQLNESS SORT_RESULT 2 1 +TQL EVAL (0, 10, '5s') test{__schema__="greptime_private"}; + +Error: 4001(TableNotFound), Table not found: greptime.greptime_private.test + -- SQLNESS SORT_RESULT 2 1 TQL EVAL (0, 10, '5s') {__name__="test", __field__="i"}; diff --git a/tests/cases/standalone/common/tql/basic.sql b/tests/cases/standalone/common/tql/basic.sql index 9d9f3c8863b4..27ed5153f134 100644 --- a/tests/cases/standalone/common/tql/basic.sql +++ b/tests/cases/standalone/common/tql/basic.sql @@ -10,6 +10,12 @@ TQL EVAL (0, 10, '5s') test; -- SQLNESS SORT_RESULT 2 1 TQL EVAL (0, 10, '5s') {__name__="test"}; +-- SQLNESS SORT_RESULT 2 1 +TQL EVAL (0, 10, '5s') test{__schema__="public"}; + +-- SQLNESS SORT_RESULT 2 1 +TQL EVAL (0, 10, '5s') test{__schema__="greptime_private"}; + -- SQLNESS SORT_RESULT 2 1 TQL EVAL (0, 10, '5s') {__name__="test", __field__="i"};