From 6a12c27e78ad484a4848855dee7032436c5dd128 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 22 Jan 2024 22:14:03 +0800 Subject: [PATCH] feat: make query be aware of timezone setting (#3175) * feat: let TypeConversionRule aware query context timezone setting * chore: don't optimize explain command * feat: parse string into timestamp with timezone * fix: compile error * chore: check the scalar value type in predicate * chore: remove mut for engine context * chore: return none if the scalar value is utf8 in time range predicate * fix: some fixme * feat: let Date and DateTime parsing from string value be aware of timezone * chore: tweak * test: add datetime from_str test with timezone * feat: construct function context from query context * test: add timezone test for to_unixtime and date_format function * fix: typo * chore: apply suggestion * test: adds string with timezone * chore: apply CR suggestion Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * chore: apply suggestion --------- Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 1 + src/cmd/src/cli/repl.rs | 7 +- src/common/function/Cargo.toml | 1 + src/common/function/src/function.rs | 8 +- .../function/src/scalars/date/date_format.rs | 6 +- .../src/scalars/timestamp/greatest.rs | 9 +- .../src/scalars/timestamp/to_unixtime.rs | 14 +- src/common/function/src/scalars/udf.rs | 16 +- src/common/query/src/logical_plan.rs | 2 +- src/common/query/src/logical_plan/udf.rs | 13 +- src/common/time/src/date.rs | 95 ++++-- src/common/time/src/datetime.rs | 112 ++++++-- src/common/time/src/timestamp.rs | 60 ++-- src/common/time/src/util.rs | 19 +- src/datanode/src/tests.rs | 12 +- src/datatypes/src/types/cast.rs | 8 +- src/datatypes/src/types/date_type.rs | 22 +- src/datatypes/src/types/datetime_type.rs | 12 +- src/datatypes/src/types/timestamp_type.rs | 6 +- src/datatypes/src/value.rs | 13 +- src/frontend/src/instance.rs | 4 +- src/operator/src/statement.rs | 13 +- src/query/src/datafusion.rs | 44 ++- src/query/src/datafusion/planner.rs | 8 +- src/query/src/error.rs | 4 + src/query/src/logical_optimizer.rs | 5 +- src/query/src/optimizer.rs | 22 ++ src/query/src/optimizer/type_conversion.rs | 82 ++++-- src/query/src/parser.rs | 1 - src/query/src/planner.rs | 16 +- src/query/src/query_engine.rs | 21 +- src/query/src/query_engine/context.rs | 19 ++ src/query/src/query_engine/state.rs | 60 +++- src/query/src/range_select/plan_rewrite.rs | 44 ++- src/query/src/tests.rs | 2 +- src/script/src/python/engine.rs | 20 +- src/servers/src/mysql/federated.rs | 7 +- src/servers/tests/mod.rs | 4 +- src/sql/src/statements.rs | 6 +- src/table/src/predicate.rs | 33 ++- .../standalone/common/system/timezone.result | 271 ++++++++++++++++++ .../standalone/common/system/timezone.sql | 70 +++++ .../common/types/timestamp/timestamp.result | 17 +- .../common/types/timestamp/timestamp.sql | 3 +- tests/runner/src/env.rs | 19 ++ 45 files changed, 994 insertions(+), 237 deletions(-) create mode 100644 tests/cases/standalone/common/system/timezone.result create mode 100644 tests/cases/standalone/common/system/timezone.sql diff --git a/Cargo.lock b/Cargo.lock index c9986c705860..abc90012a3ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1710,6 +1710,7 @@ dependencies = [ "paste", "ron", "serde", + "session", "snafu", "statrs", ] diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 066d89e69d5b..c8f5ae841854 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -164,12 +164,13 @@ impl Repl { let plan = query_engine .planner() - .plan(stmt, query_ctx) + .plan(stmt, query_ctx.clone()) .await .context(PlanStatementSnafu)?; - let LogicalPlan::DfPlan(plan) = - query_engine.optimize(&plan).context(PlanStatementSnafu)?; + let LogicalPlan::DfPlan(plan) = query_engine + .optimize(&query_engine.engine_context(query_ctx), &plan) + .context(PlanStatementSnafu)?; let plan = DFLogicalSubstraitConvertor {} .encode(&plan) diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 3db195668e60..05dbdce23f5c 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -19,6 +19,7 @@ num = "0.4" num-traits = "0.2" once_cell.workspace = true paste = "1.0" +session.workspace = true snafu.workspace = true statrs = "0.16" diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index 3b6a51b0289f..a1e43aca4a97 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -17,20 +17,20 @@ use std::sync::Arc; use common_query::error::Result; use common_query::prelude::Signature; -use common_time::timezone::get_timezone; -use common_time::Timezone; use datatypes::data_type::ConcreteDataType; use datatypes::vectors::VectorRef; +use session::context::{QueryContextBuilder, QueryContextRef}; +/// The function execution context #[derive(Clone)] pub struct FunctionContext { - pub timezone: Timezone, + pub query_ctx: QueryContextRef, } impl Default for FunctionContext { fn default() -> Self { Self { - timezone: get_timezone(None).clone(), + query_ctx: QueryContextBuilder::default().build(), } } } diff --git a/src/common/function/src/scalars/date/date_format.rs b/src/common/function/src/scalars/date/date_format.rs index d94f115e54a9..fc82dbe06edc 100644 --- a/src/common/function/src/scalars/date/date_format.rs +++ b/src/common/function/src/scalars/date/date_format.rs @@ -79,7 +79,7 @@ impl Function for DateFormatFunction { let result = match (ts, format) { (Some(ts), Some(fmt)) => Some( - ts.as_formatted_string(&fmt, Some(&func_ctx.timezone)) + ts.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone())) .map_err(BoxedError::new) .context(error::ExecuteSnafu)?, ), @@ -96,7 +96,7 @@ impl Function for DateFormatFunction { let result = match (date, format) { (Some(date), Some(fmt)) => date - .as_formatted_string(&fmt, Some(&func_ctx.timezone)) + .as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone())) .map_err(BoxedError::new) .context(error::ExecuteSnafu)?, _ => None, @@ -112,7 +112,7 @@ impl Function for DateFormatFunction { let result = match (datetime, format) { (Some(datetime), Some(fmt)) => datetime - .as_formatted_string(&fmt, Some(&func_ctx.timezone)) + .as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone())) .map_err(BoxedError::new) .context(error::ExecuteSnafu)?, _ => None, diff --git a/src/common/function/src/scalars/timestamp/greatest.rs b/src/common/function/src/scalars/timestamp/greatest.rs index fd3fe0a16897..e8dfd21a65b9 100644 --- a/src/common/function/src/scalars/timestamp/greatest.rs +++ b/src/common/function/src/scalars/timestamp/greatest.rs @@ -104,7 +104,6 @@ impl fmt::Display for GreatestFunction { #[cfg(test)] mod tests { - use std::str::FromStr; use std::sync::Arc; use common_time::Date; @@ -137,11 +136,11 @@ mod tests { assert_eq!(result.len(), 2); assert_eq!( result.get(0), - Value::Date(Date::from_str("2001-02-01").unwrap()) + Value::Date(Date::from_str_utc("2001-02-01").unwrap()) ); assert_eq!( result.get(1), - Value::Date(Date::from_str("2012-12-23").unwrap()) + Value::Date(Date::from_str_utc("2012-12-23").unwrap()) ); } @@ -162,11 +161,11 @@ mod tests { assert_eq!(result.len(), 2); assert_eq!( result.get(0), - Value::Date(Date::from_str("1970-01-01").unwrap()) + Value::Date(Date::from_str_utc("1970-01-01").unwrap()) ); assert_eq!( result.get(1), - Value::Date(Date::from_str("1970-01-03").unwrap()) + Value::Date(Date::from_str_utc("1970-01-03").unwrap()) ); } } diff --git a/src/common/function/src/scalars/timestamp/to_unixtime.rs b/src/common/function/src/scalars/timestamp/to_unixtime.rs index 4d914ecba919..cc297942d114 100644 --- a/src/common/function/src/scalars/timestamp/to_unixtime.rs +++ b/src/common/function/src/scalars/timestamp/to_unixtime.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt; -use std::str::FromStr; use std::sync::Arc; use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; @@ -31,16 +30,17 @@ pub struct ToUnixtimeFunction; const NAME: &str = "to_unixtime"; -fn convert_to_seconds(arg: &str) -> Option { - if let Ok(dt) = DateTime::from_str(arg) { +fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option { + let timezone = &func_ctx.query_ctx.timezone(); + if let Ok(dt) = DateTime::from_str(arg, Some(timezone)) { return Some(dt.val() / 1000); } - if let Ok(ts) = Timestamp::from_str(arg) { + if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) { return Some(ts.split().0); } - if let Ok(date) = Date::from_str(arg) { + if let Ok(date) = Date::from_str(arg, Some(timezone)) { return Some(date.to_secs()); } @@ -92,7 +92,7 @@ impl Function for ToUnixtimeFunction { ) } - fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { ensure!( columns.len() == 1, InvalidFuncArgsSnafu { @@ -108,7 +108,7 @@ impl Function for ToUnixtimeFunction { match columns[0].data_type() { ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from( (0..vector.len()) - .map(|i| convert_to_seconds(&vector.get(i).to_string())) + .map(|i| convert_to_seconds(&vector.get(i).to_string(), &func_ctx)) .collect::>(), ))), ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => { diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index 5b91ad1302ac..0555581e6117 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -21,22 +21,24 @@ use common_query::prelude::{ use datatypes::error::Error as DataTypeError; use datatypes::prelude::*; use datatypes::vectors::Helper; +use session::context::QueryContextRef; use snafu::ResultExt; use crate::function::{FunctionContext, FunctionRef}; -/// Create a ScalarUdf from function. -pub fn create_udf(func: FunctionRef) -> ScalarUdf { +/// Create a ScalarUdf from function and query context. +pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf { let func_cloned = func.clone(); let return_type: ReturnTypeFunction = Arc::new(move |input_types: &[ConcreteDataType]| { Ok(Arc::new(func_cloned.return_type(input_types)?)) }); let func_cloned = func.clone(); + let fun: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| { - // FIXME(dennis): set the timezone into function context - // Question: how to get the timezone from the query context? - let func_ctx = FunctionContext::default(); + let func_ctx = FunctionContext { + query_ctx: query_ctx.clone(), + }; let len = args .iter() @@ -72,6 +74,7 @@ mod tests { use datatypes::prelude::{ScalarVector, Vector, VectorRef}; use datatypes::value::Value; use datatypes::vectors::{BooleanVector, ConstantVector}; + use session::context::QueryContextBuilder; use super::*; use crate::function::Function; @@ -80,6 +83,7 @@ mod tests { #[test] fn test_create_udf() { let f = Arc::new(TestAndFunction); + let query_ctx = QueryContextBuilder::default().build(); let args: Vec = vec![ Arc::new(ConstantVector::new( @@ -97,7 +101,7 @@ mod tests { } // create a udf and test it again - let udf = create_udf(f.clone()); + let udf = create_udf(f.clone(), query_ctx); assert_eq!("test_and", udf.name); assert_eq!(f.signature(), udf.signature); diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 0b8d67ae6abb..ac20c74b5b58 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -134,7 +134,7 @@ mod tests { assert_eq!(return_type, (udf.return_type)(&[]).unwrap()); // test into_df_udf - let df_udf: DfScalarUDF = udf.into_df_udf(); + let df_udf: DfScalarUDF = udf.into(); assert_eq!("and", df_udf.name); let types = vec![DataType::Boolean, DataType::Boolean]; diff --git a/src/common/query/src/logical_plan/udf.rs b/src/common/query/src/logical_plan/udf.rs index 6a48a7b97ff9..31d356174502 100644 --- a/src/common/query/src/logical_plan/udf.rs +++ b/src/common/query/src/logical_plan/udf.rs @@ -66,14 +66,15 @@ impl ScalarUdf { fun: fun.clone(), } } +} - /// Cast self into datafusion UDF. - pub fn into_df_udf(self) -> DfScalarUDF { +impl From for DfScalarUDF { + fn from(udf: ScalarUdf) -> Self { DfScalarUDF::new( - &self.name, - &self.signature.into(), - &to_df_return_type(self.return_type), - &to_df_scalar_func(self.fun), + &udf.name, + &udf.signature.into(), + &to_df_return_type(udf.return_type), + &to_df_scalar_func(udf.fun), ) } } diff --git a/src/common/time/src/date.rs b/src/common/time/src/date.rs index d4182b7c1b6a..a04b529fd448 100644 --- a/src/common/time/src/date.rs +++ b/src/common/time/src/date.rs @@ -13,16 +13,16 @@ // limitations under the License. use std::fmt::{Display, Formatter, Write}; -use std::str::FromStr; -use chrono::{Datelike, Days, Months, NaiveDate, NaiveTime, TimeZone}; +use chrono::{Datelike, Days, LocalResult, Months, NaiveDate, NaiveTime, TimeZone}; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::ResultExt; -use crate::error::{Error, ParseDateStrSnafu, Result}; +use crate::error::{InvalidDateStrSnafu, ParseDateStrSnafu, Result}; use crate::interval::Interval; use crate::timezone::get_timezone; +use crate::util::datetime_to_utc; use crate::Timezone; const UNIX_EPOCH_FROM_CE: i32 = 719_163; @@ -40,16 +40,6 @@ impl From for Value { } } -impl FromStr for Date { - type Err = Error; - - fn from_str(s: &str) -> Result { - let s = s.trim(); - let date = NaiveDate::parse_from_str(s, "%F").context(ParseDateStrSnafu { raw: s })?; - Ok(Self(date.num_days_from_ce() - UNIX_EPOCH_FROM_CE)) - } -} - impl From for Date { fn from(v: i32) -> Self { Self(v) @@ -74,6 +64,26 @@ impl Display for Date { } impl Date { + /// Try parsing a string into [`Date`] with UTC timezone. + pub fn from_str_utc(s: &str) -> Result { + Self::from_str(s, None) + } + + /// Try parsing a string into [`Date`] with given timezone. + pub fn from_str(s: &str, timezone: Option<&Timezone>) -> Result { + let s = s.trim(); + let date = NaiveDate::parse_from_str(s, "%F").context(ParseDateStrSnafu { raw: s })?; + let Some(timezone) = timezone else { + return Ok(Self(date.num_days_from_ce() - UNIX_EPOCH_FROM_CE)); + }; + + let datetime = date.and_time(NaiveTime::default()); + match datetime_to_utc(&datetime, timezone) { + LocalResult::None => InvalidDateStrSnafu { raw: s }.fail(), + LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => Ok(Date::from(utc.date())), + } + } + pub fn new(val: i32) -> Self { Self(val) } @@ -168,23 +178,64 @@ mod tests { pub fn test_date_parse() { assert_eq!( "1970-01-01", - Date::from_str("1970-01-01").unwrap().to_string() + Date::from_str("1970-01-01", None).unwrap().to_string() ); assert_eq!( "1969-01-01", - Date::from_str("1969-01-01").unwrap().to_string() + Date::from_str("1969-01-01", None).unwrap().to_string() ); assert_eq!( "1969-01-01", - Date::from_str(" 1969-01-01 ") + Date::from_str(" 1969-01-01 ", None) .unwrap() .to_string() ); let now = Utc::now().date_naive().format("%F").to_string(); - assert_eq!(now, Date::from_str(&now).unwrap().to_string()); + assert_eq!(now, Date::from_str(&now, None).unwrap().to_string()); + + // with timezone + assert_eq!( + "1969-12-31", + Date::from_str( + "1970-01-01", + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + ) + .unwrap() + .to_string() + ); + + assert_eq!( + "1969-12-31", + Date::from_str( + "1970-01-01", + Some(&Timezone::from_tz_string("+16:00").unwrap()) + ) + .unwrap() + .to_string() + ); + + assert_eq!( + "1970-01-01", + Date::from_str( + "1970-01-01", + Some(&Timezone::from_tz_string("-8:00").unwrap()) + ) + .unwrap() + .to_string() + ); + + assert_eq!( + "1970-01-01", + Date::from_str( + "1970-01-01", + Some(&Timezone::from_tz_string("-16:00").unwrap()) + ) + .unwrap() + .to_string() + ); } #[test] @@ -201,9 +252,9 @@ mod tests { #[test] pub fn test_min_max() { - let mut date = Date::from_str("9999-12-31").unwrap(); + let mut date = Date::from_str("9999-12-31", None).unwrap(); date.0 += 1000; - assert_eq!(date, Date::from_str(&date.to_string()).unwrap()); + assert_eq!(date, Date::from_str(&date.to_string(), None).unwrap()); } #[test] @@ -245,11 +296,11 @@ mod tests { #[test] fn test_to_secs() { - let d = Date::from_str("1970-01-01").unwrap(); + let d = Date::from_str("1970-01-01", None).unwrap(); assert_eq!(d.to_secs(), 0); - let d = Date::from_str("1970-01-02").unwrap(); + let d = Date::from_str("1970-01-02", None).unwrap(); assert_eq!(d.to_secs(), 24 * 3600); - let d = Date::from_str("1970-01-03").unwrap(); + let d = Date::from_str("1970-01-03", None).unwrap(); assert_eq!(d.to_secs(), 2 * 24 * 3600); } } diff --git a/src/common/time/src/datetime.rs b/src/common/time/src/datetime.rs index 722467cfab1f..26df42490c98 100644 --- a/src/common/time/src/datetime.rs +++ b/src/common/time/src/datetime.rs @@ -13,16 +13,15 @@ // limitations under the License. use std::fmt::{Display, Formatter, Write}; -use std::str::FromStr; use std::time::Duration; use chrono::{Days, LocalResult, Months, NaiveDateTime, TimeZone as ChronoTimeZone, Utc}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; -use crate::error::{Error, InvalidDateStrSnafu, Result}; +use crate::error::{InvalidDateStrSnafu, Result}; use crate::timezone::{get_timezone, Timezone}; -use crate::util::{format_utc_datetime, local_datetime_to_utc}; +use crate::util::{datetime_to_utc, format_utc_datetime}; use crate::{Date, Interval}; const DATETIME_FORMAT: &str = "%F %T"; @@ -60,17 +59,41 @@ impl From for DateTime { } } -impl FromStr for DateTime { - type Err = Error; +impl From for DateTime { + fn from(v: i64) -> Self { + Self(v) + } +} + +impl From for DateTime { + fn from(value: Date) -> Self { + // It's safe, i32 * 86400000 won't be overflow + Self(value.to_secs() * 1000) + } +} - fn from_str(s: &str) -> Result { +impl DateTime { + /// Try parsing a string into [`DateTime`] with the system timezone. + /// See `DateTime::from_str`. + pub fn from_str_system(s: &str) -> Result { + Self::from_str(s, None) + } + + /// Try parsing a string into [`DateTime`] with the given timezone. + /// Supported format: + /// - RFC3339 in the naive UTC timezone. + /// - `%F %T` with the given timezone + /// - `%F %T%z` with the timezone in string + pub fn from_str(s: &str, timezone: Option<&Timezone>) -> Result { let s = s.trim(); - let timestamp_millis = if let Ok(d) = NaiveDateTime::parse_from_str(s, DATETIME_FORMAT) { - match local_datetime_to_utc(&d) { + let timestamp_millis = if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) { + dt.naive_utc().timestamp_millis() + } else if let Ok(d) = NaiveDateTime::parse_from_str(s, DATETIME_FORMAT) { + match datetime_to_utc(&d, get_timezone(timezone)) { LocalResult::None => { return InvalidDateStrSnafu { raw: s }.fail(); } - LocalResult::Single(d) | LocalResult::Ambiguous(d, _) => d.timestamp_millis(), + LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => utc.timestamp_millis(), } } else if let Ok(v) = chrono::DateTime::parse_from_str(s, DATETIME_FORMAT_WITH_TZ) { v.timestamp_millis() @@ -80,22 +103,7 @@ impl FromStr for DateTime { Ok(Self(timestamp_millis)) } -} - -impl From for DateTime { - fn from(v: i64) -> Self { - Self(v) - } -} - -impl From for DateTime { - fn from(value: Date) -> Self { - // It's safe, i32 * 86400000 won't be overflow - Self(value.to_secs() * 1000) - } -} -impl DateTime { /// Create a new [DateTime] from milliseconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch). pub fn new(millis: i64) -> Self { Self(millis) @@ -201,9 +209,9 @@ mod tests { pub fn test_parse_from_string() { set_default_timezone(Some("Asia/Shanghai")).unwrap(); let time = "1970-01-01 00:00:00+0800"; - let dt = DateTime::from_str(time).unwrap(); + let dt = DateTime::from_str(time, None).unwrap(); assert_eq!(time, &dt.to_string()); - let dt = DateTime::from_str(" 1970-01-01 00:00:00+0800 ").unwrap(); + let dt = DateTime::from_str(" 1970-01-01 00:00:00+0800 ", None).unwrap(); assert_eq!(time, &dt.to_string()); } @@ -230,17 +238,63 @@ mod tests { set_default_timezone(Some("Asia/Shanghai")).unwrap(); assert_eq!( -28800000, - DateTime::from_str("1970-01-01 00:00:00").unwrap().val() + DateTime::from_str("1970-01-01 00:00:00", None) + .unwrap() + .val() + ); + assert_eq!( + 0, + DateTime::from_str("1970-01-01 08:00:00", None) + .unwrap() + .val() + ); + + assert_eq!( + 0, + DateTime::from_str( + "1970-01-01 08:00:00", + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + ) + .unwrap() + .val() + ); + + assert_eq!( + -28800000, + DateTime::from_str( + "1970-01-01 00:00:00", + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + ) + .unwrap() + .val() + ); + + assert_eq!( + 28800000, + DateTime::from_str( + "1970-01-01 00:00:00", + Some(&Timezone::from_tz_string("-8:00").unwrap()) + ) + .unwrap() + .val() ); - assert_eq!(0, DateTime::from_str("1970-01-01 08:00:00").unwrap().val()); } #[test] fn test_parse_local_date_time_with_tz() { - let ts = DateTime::from_str("1970-01-01 08:00:00+0000") + let ts = DateTime::from_str("1970-01-01 08:00:00+0000", None) .unwrap() .val(); assert_eq!(28800000, ts); + + // the string has the time zone info, the argument doesn't change the result + let ts = DateTime::from_str( + "1970-01-01 08:00:00+0000", + Some(&Timezone::from_tz_string("-8:00").unwrap()), + ) + .unwrap() + .val(); + assert_eq!(28800000, ts); } #[test] diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 2f13b09a1503..6aeac49b257e 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -16,21 +16,19 @@ use core::default::Default; use std::cmp::Ordering; use std::fmt::{Display, Formatter, Write}; use std::hash::{Hash, Hasher}; -use std::str::FromStr; use std::time::Duration; use arrow::datatypes::TimeUnit as ArrowTimeUnit; use chrono::{ - DateTime, Days, Months, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc, + DateTime, Days, LocalResult, Months, NaiveDate, NaiveDateTime, NaiveTime, + TimeZone as ChronoTimeZone, Utc, }; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use crate::error::{ - ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, Result, TimestampOverflowSnafu, -}; +use crate::error::{ArithmeticOverflowSnafu, ParseTimestampSnafu, Result, TimestampOverflowSnafu}; use crate::timezone::{get_timezone, Timezone}; -use crate::util::div_ceil; +use crate::util::{datetime_to_utc, div_ceil}; use crate::{error, Interval}; /// Timestamp represents the value of units(seconds/milliseconds/microseconds/nanoseconds) elapsed @@ -372,10 +370,12 @@ impl Timestamp { pub fn from_chrono_date(date: NaiveDate) -> Option { Timestamp::from_chrono_datetime(date.and_time(NaiveTime::default())) } -} -impl FromStr for Timestamp { - type Err = Error; + /// Accepts a string in RFC3339 / ISO8601 standard format and some variants and converts it to a nanosecond precision timestamp. + /// It no timezone specified in string, it cast to nanosecond epoch timestamp in UTC. + pub fn from_str_utc(s: &str) -> Result { + Self::from_str(s, None) + } /// Accepts a string in RFC3339 / ISO8601 standard format and some variants and converts it to a nanosecond precision timestamp. /// This code is copied from [arrow-datafusion](https://github.com/apache/arrow-datafusion/blob/arrow2/datafusion-physical-expr/src/arrow_temporal_util.rs#L71) @@ -383,13 +383,13 @@ impl FromStr for Timestamp { /// Supported format: /// - `2022-09-20T14:16:43.012345Z` (Zulu timezone) /// - `2022-09-20T14:16:43.012345+08:00` (Explicit offset) - /// - `2022-09-20T14:16:43.012345` (Zulu timezone, with T) + /// - `2022-09-20T14:16:43.012345` (The given timezone, with T) /// - `2022-09-20T14:16:43` (Zulu timezone, no fractional seconds, with T) /// - `2022-09-20 14:16:43.012345Z` (Zulu timezone, without T) - /// - `2022-09-20 14:16:43` (Zulu timezone, without T) - /// - `2022-09-20 14:16:43.012345` (Zulu timezone, without T) + /// - `2022-09-20 14:16:43` (The given timezone, without T) + /// - `2022-09-20 14:16:43.012345` (The given timezone, without T) #[allow(deprecated)] - fn from_str(s: &str) -> std::result::Result { + pub fn from_str(s: &str, timezone: Option<&Timezone>) -> Result { // RFC3339 timestamp (with a T) let s = s.trim(); if let Ok(ts) = DateTime::parse_from_rfc3339(s) { @@ -406,19 +406,19 @@ impl FromStr for Timestamp { } if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); + return naive_datetime_to_timestamp(s, ts, timezone); } if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") { - return naive_datetime_to_timestamp(s, ts); + return naive_datetime_to_timestamp(s, ts, timezone); } if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); + return naive_datetime_to_timestamp(s, ts, timezone); } if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") { - return naive_datetime_to_timestamp(s, ts); + return naive_datetime_to_timestamp(s, ts, timezone); } ParseTimestampSnafu { raw: s }.fail() @@ -430,9 +430,19 @@ impl FromStr for Timestamp { fn naive_datetime_to_timestamp( s: &str, datetime: NaiveDateTime, + timezone: Option<&Timezone>, ) -> crate::error::Result { - Timestamp::from_chrono_datetime(Utc.from_utc_datetime(&datetime).naive_utc()) - .context(ParseTimestampSnafu { raw: s }) + let Some(timezone) = timezone else { + return Timestamp::from_chrono_datetime(Utc.from_utc_datetime(&datetime).naive_utc()) + .context(ParseTimestampSnafu { raw: s }); + }; + + match datetime_to_utc(&datetime, timezone) { + LocalResult::None => ParseTimestampSnafu { raw: s }.fail(), + LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => { + Timestamp::from_chrono_datetime(utc).context(ParseTimestampSnafu { raw: s }) + } + } } impl From for Timestamp { @@ -786,7 +796,7 @@ mod tests { // Input timestamp string is regarded as local timezone if no timezone is specified, // but expected timestamp is in UTC timezone fn check_from_str(s: &str, expect: &str) { - let ts = Timestamp::from_str(s).unwrap(); + let ts = Timestamp::from_str_utc(s).unwrap(); let time = ts.to_chrono_datetime().unwrap(); assert_eq!(expect, time.to_string()); } @@ -812,7 +822,7 @@ mod tests { fn test_to_iso8601_string() { set_default_timezone(Some("Asia/Shanghai")).unwrap(); let datetime_str = "2020-09-08 13:42:29.042+0000"; - let ts = Timestamp::from_str(datetime_str).unwrap(); + let ts = Timestamp::from_str_utc(datetime_str).unwrap(); assert_eq!("2020-09-08 21:42:29.042+0800", ts.to_iso8601_string()); let ts_millis = 1668070237000; @@ -1079,17 +1089,17 @@ mod tests { std::env::set_var("TZ", "Asia/Shanghai"); assert_eq!( Timestamp::new(28800, TimeUnit::Second), - Timestamp::from_str("1970-01-01 08:00:00.000").unwrap() + Timestamp::from_str_utc("1970-01-01 08:00:00.000").unwrap() ); assert_eq!( Timestamp::new(28800, TimeUnit::Second), - Timestamp::from_str("1970-01-01 08:00:00").unwrap() + Timestamp::from_str_utc("1970-01-01 08:00:00").unwrap() ); assert_eq!( Timestamp::new(28800, TimeUnit::Second), - Timestamp::from_str(" 1970-01-01 08:00:00 ").unwrap() + Timestamp::from_str_utc(" 1970-01-01 08:00:00 ").unwrap() ); } @@ -1286,7 +1296,7 @@ mod tests { ]; for s in valid_strings { - Timestamp::from_str(s).unwrap(); + Timestamp::from_str_utc(s).unwrap(); } } } diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index 1a890ec2092f..6ce824764a2b 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -18,6 +18,7 @@ use chrono::{LocalResult, NaiveDateTime, TimeZone}; use chrono_tz::Tz; use crate::timezone::get_timezone; +use crate::Timezone; pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String { match get_timezone(None) { @@ -28,10 +29,20 @@ pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String { } } -pub fn local_datetime_to_utc(local: &NaiveDateTime) -> LocalResult { - match get_timezone(None) { - crate::Timezone::Offset(offset) => offset.from_local_datetime(local).map(|x| x.naive_utc()), - crate::Timezone::Named(tz) => tz.from_local_datetime(local).map(|x| x.naive_utc()), +pub fn system_datetime_to_utc(local: &NaiveDateTime) -> LocalResult { + datetime_to_utc(local, get_timezone(None)) +} + +/// Cast a [`NaiveDateTime`] with the given timezone. +pub fn datetime_to_utc( + datetime: &NaiveDateTime, + timezone: &Timezone, +) -> LocalResult { + match timezone { + crate::Timezone::Offset(offset) => { + offset.from_local_datetime(datetime).map(|x| x.naive_utc()) + } + crate::Timezone::Named(tz) => tz.from_local_datetime(datetime).map(|x| x.naive_utc()), } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index e04dd32907ee..1a38c88f0c8d 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -28,7 +28,7 @@ use query::dataframe::DataFrame; use query::plan::LogicalPlan; use query::planner::LogicalPlanner; use query::query_engine::DescribeResult; -use query::QueryEngine; +use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; @@ -57,7 +57,11 @@ impl QueryEngine for MockQueryEngine { "MockQueryEngine" } - async fn describe(&self, _plan: LogicalPlan) -> query::error::Result { + async fn describe( + &self, + _plan: LogicalPlan, + _query_ctx: QueryContextRef, + ) -> query::error::Result { unimplemented!() } @@ -78,6 +82,10 @@ impl QueryEngine for MockQueryEngine { fn read_table(&self, _table: TableRef) -> query::error::Result { unimplemented!() } + + fn engine_context(&self, _query_ctx: QueryContextRef) -> QueryEngineContext { + unimplemented!() + } } /// Create a region server without any engine diff --git a/src/datatypes/src/types/cast.rs b/src/datatypes/src/types/cast.rs index d92f5f9bbfbb..702244a14291 100644 --- a/src/datatypes/src/types/cast.rs +++ b/src/datatypes/src/types/cast.rs @@ -172,8 +172,6 @@ fn invalid_type_cast(src_value: &Value, dest_type: &ConcreteDataType) -> Error { #[cfg(test)] mod tests { - use std::str::FromStr; - use common_base::bytes::StringBytes; use common_time::time::Time; use common_time::timezone::set_default_timezone; @@ -283,7 +281,7 @@ mod tests { // date -> other types test_can_cast!( - Value::Date(Date::from_str("2021-01-01").unwrap()), + Value::Date(Date::from_str_utc("2021-01-01").unwrap()), null_datatype, int32_datatype, timestamp_second_datatype, @@ -292,7 +290,7 @@ mod tests { // datetime -> other types test_can_cast!( - Value::DateTime(DateTime::from_str("2021-01-01 00:00:00").unwrap()), + Value::DateTime(DateTime::from_str_system("2021-01-01 00:00:00").unwrap()), null_datatype, int64_datatype, timestamp_second_datatype, @@ -301,7 +299,7 @@ mod tests { // timestamp -> other types test_can_cast!( - Value::Timestamp(Timestamp::from_str("2021-01-01 00:00:00").unwrap()), + Value::Timestamp(Timestamp::from_str_utc("2021-01-01 00:00:00").unwrap()), null_datatype, int64_datatype, date_datatype, diff --git a/src/datatypes/src/types/date_type.rs b/src/datatypes/src/types/date_type.rs index a0df0b5a2151..5481b6fa5bcf 100644 --- a/src/datatypes/src/types/date_type.rs +++ b/src/datatypes/src/types/date_type.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use arrow::datatypes::{DataType as ArrowDataType, Date32Type}; use common_time::Date; use serde::{Deserialize, Serialize}; @@ -55,7 +53,7 @@ impl DataType for DateType { fn try_cast(&self, from: Value) -> Option { match from { Value::Int32(v) => Some(Value::Date(Date::from(v))), - Value::String(v) => Date::from_str(v.as_utf8()).map(Value::Date).ok(), + Value::String(v) => Date::from_str_utc(v.as_utf8()).map(Value::Date).ok(), Value::Timestamp(v) => v.to_chrono_date().map(|date| Value::Date(date.into())), Value::DateTime(v) => Some(Value::DateTime(v)), _ => None, @@ -111,32 +109,32 @@ mod tests { fn test_date_cast() { set_default_timezone(Some("Asia/Shanghai")).unwrap(); // timestamp -> date - let ts = Value::Timestamp(Timestamp::from_str("2000-01-01 08:00:01").unwrap()); + let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-01 08:00:01").unwrap()); let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("2000-01-01").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("2000-01-01").unwrap())); // this case bind with Zulu timezone. - let ts = Value::Timestamp(Timestamp::from_str("2000-01-02 07:59:59").unwrap()); + let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-02 07:59:59").unwrap()); let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("2000-01-02").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("2000-01-02").unwrap())); // while this case is offsetted to Asia/Shanghai. - let ts = Value::Timestamp(Timestamp::from_str("2000-01-02 07:59:59+08:00").unwrap()); + let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-02 07:59:59+08:00").unwrap()); let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("2000-01-01").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("2000-01-01").unwrap())); // Int32 -> date let val = Value::Int32(0); let date = ConcreteDataType::date_datatype().try_cast(val).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("1970-01-01").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("1970-01-01").unwrap())); let val = Value::Int32(19614); let date = ConcreteDataType::date_datatype().try_cast(val).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("2023-09-14").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("2023-09-14").unwrap())); // String -> date let s = Value::String(StringBytes::from("1970-02-12")); let date = ConcreteDataType::date_datatype().try_cast(s).unwrap(); - assert_eq!(date, Value::Date(Date::from_str("1970-02-12").unwrap())); + assert_eq!(date, Value::Date(Date::from_str_utc("1970-02-12").unwrap())); } } diff --git a/src/datatypes/src/types/datetime_type.rs b/src/datatypes/src/types/datetime_type.rs index 4e23982a2e34..699eea3067ea 100644 --- a/src/datatypes/src/types/datetime_type.rs +++ b/src/datatypes/src/types/datetime_type.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use arrow::datatypes::{DataType as ArrowDataType, Date64Type}; use common_time::DateTime; use serde::{Deserialize, Serialize}; @@ -54,7 +52,9 @@ impl DataType for DateTimeType { match from { Value::Int64(v) => Some(Value::DateTime(DateTime::from(v))), Value::Timestamp(v) => v.to_chrono_datetime().map(|d| Value::DateTime(d.into())), - Value::String(v) => DateTime::from_str(v.as_utf8()).map(Value::DateTime).ok(), + Value::String(v) => DateTime::from_str_system(v.as_utf8()) + .map(Value::DateTime) + .ok(), _ => None, } } @@ -119,15 +119,15 @@ mod tests { let dt = ConcreteDataType::datetime_datatype().try_cast(val).unwrap(); assert_eq!( dt, - Value::DateTime(DateTime::from_str("1970-01-01 00:00:00+0800").unwrap()) + Value::DateTime(DateTime::from_str_system("1970-01-01 00:00:00+0800").unwrap()) ); // cast from Timestamp - let val = Value::Timestamp(Timestamp::from_str("2020-09-08 21:42:29+0800").unwrap()); + let val = Value::Timestamp(Timestamp::from_str_utc("2020-09-08 21:42:29+0800").unwrap()); let dt = ConcreteDataType::datetime_datatype().try_cast(val).unwrap(); assert_eq!( dt, - Value::DateTime(DateTime::from_str("2020-09-08 21:42:29+0800").unwrap()) + Value::DateTime(DateTime::from_str_system("2020-09-08 21:42:29+0800").unwrap()) ); } } diff --git a/src/datatypes/src/types/timestamp_type.rs b/src/datatypes/src/types/timestamp_type.rs index bca9d3e8e2e2..cda1c2603a60 100644 --- a/src/datatypes/src/types/timestamp_type.rs +++ b/src/datatypes/src/types/timestamp_type.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use arrow::datatypes::{ DataType as ArrowDataType, TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as ArrowTimestampMicrosecondType, @@ -132,7 +130,7 @@ macro_rules! impl_data_type_for_timestamp { fn try_cast(&self, from: Value)-> Option{ match from { Value::Timestamp(v) => v.convert_to(TimeUnit::$unit).map(Value::Timestamp), - Value::String(v) => Timestamp::from_str(v.as_utf8()).map(Value::Timestamp).ok(), + Value::String(v) => Timestamp::from_str_utc(v.as_utf8()).map(Value::Timestamp).ok(), Value::Int64(v) => Some(Value::Timestamp(Timestamp::new(v, TimeUnit::$unit))), Value::DateTime(v) => Timestamp::new_second(v.val()).convert_to(TimeUnit::$unit).map(Value::Timestamp), Value::Date(v) => Timestamp::new_second(v.to_secs()).convert_to(TimeUnit::$unit).map(Value::Timestamp), @@ -259,7 +257,7 @@ mod tests { assert_eq!(ts, Value::Timestamp(Timestamp::new_second(1234567))); // Date -> TimestampMillisecond - let d = Value::Date(Date::from_str("1970-01-01").unwrap()); + let d = Value::Date(Date::from_str_utc("1970-01-01").unwrap()); let ts = ConcreteDataType::timestamp_millisecond_datatype() .try_cast(d) .unwrap(); diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index fede1420bb06..bf3445a922d9 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -14,7 +14,6 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; -use std::str::FromStr; use std::sync::Arc; use arrow::datatypes::{DataType as ArrowDataType, Field}; @@ -26,7 +25,7 @@ use common_time::datetime::DateTime; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::{TimeUnit, Timestamp}; -use common_time::{Duration, Interval}; +use common_time::{Duration, Interval, Timezone}; use datafusion_common::ScalarValue; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; @@ -428,11 +427,15 @@ pub fn duration_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue } } -/// Convert [ScalarValue] to [Timestamp]. +/// Convert [`ScalarValue`] to [`Timestamp`]. +/// If it's `ScalarValue::Utf8`, try to parse it with the given timezone. /// Return `None` if given scalar value cannot be converted to a valid timestamp. -pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option { +pub fn scalar_value_to_timestamp( + scalar: &ScalarValue, + timezone: Option<&Timezone>, +) -> Option { match scalar { - ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) { + ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s, timezone) { Ok(t) => Some(t), Err(e) => { logging::error!(e;"Failed to convert string literal {s} to timestamp"); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f73d490de74f..ae22121100b2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -374,11 +374,11 @@ impl SqlQueryHandler for Instance { let plan = self .query_engine .planner() - .plan(QueryStatement::Sql(stmt), query_ctx) + .plan(QueryStatement::Sql(stmt), query_ctx.clone()) .await .context(PlanStatementSnafu)?; self.query_engine - .describe(plan) + .describe(plan, query_ctx) .await .map(Some) .context(error::DescribeStatementSnafu) diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 93106b8587a1..86600a8c8b9a 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -21,7 +21,6 @@ mod dml; mod show; mod tql; -use std::str::FromStr; use std::sync::Arc; use catalog::CatalogManagerRef; @@ -331,8 +330,8 @@ fn to_copy_database_request( .map_err(BoxedError::new) .context(ExternalSnafu)?; - let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?; - let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?; + let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY, query_ctx)?; + let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY, query_ctx)?; let time_range = match (start_timestamp, end_timestamp) { (Some(start), Some(end)) => TimestampRange::new(start, end), @@ -352,10 +351,14 @@ fn to_copy_database_request( } /// Extracts timestamp from a [HashMap] with given key. -fn extract_timestamp(map: &OptionMap, key: &str) -> Result> { +fn extract_timestamp( + map: &OptionMap, + key: &str, + query_ctx: &QueryContextRef, +) -> Result> { map.get(key) .map(|v| { - Timestamp::from_str(v) + Timestamp::from_str(v, Some(&query_ctx.timezone())) .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build()) }) .transpose() diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index b1c421b01cca..6dab18487c56 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -26,7 +26,6 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_function::function::FunctionRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; -use common_function::scalars::udf::create_udf; use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalPlanAdapter}; use common_query::prelude::ScalarUdf; use common_query::Output; @@ -84,7 +83,7 @@ impl DatafusionQueryEngine { plan: LogicalPlan, query_ctx: QueryContextRef, ) -> Result { - let mut ctx = QueryEngineContext::new(self.state.session_state(), query_ctx.clone()); + let mut ctx = self.engine_context(query_ctx.clone()); // `create_physical_plan` will optimize logical plan internally let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?; @@ -242,8 +241,13 @@ impl QueryEngine for DatafusionQueryEngine { "datafusion" } - async fn describe(&self, plan: LogicalPlan) -> Result { - let optimised_plan = self.optimize(&plan)?; + async fn describe( + &self, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { + let ctx = self.engine_context(query_ctx); + let optimised_plan = self.optimize(&ctx, &plan)?; Ok(DescribeResult { schema: optimised_plan.schema()?, logical_plan: optimised_plan, @@ -259,10 +263,6 @@ impl QueryEngine for DatafusionQueryEngine { } } - fn register_udf(&self, udf: ScalarUdf) { - self.state.register_udf(udf); - } - /// Note in SQL queries, aggregate names are looked up using /// lowercase unless the query uses quotes. For example, /// @@ -274,8 +274,15 @@ impl QueryEngine for DatafusionQueryEngine { self.state.register_aggregate_function(func); } + /// Register a [`ScalarUdf`]. + fn register_udf(&self, udf: ScalarUdf) { + self.state.register_udf(udf); + } + + /// Register an UDF function. + /// Will override if the function with same name is already registered. fn register_function(&self, func: FunctionRef) { - self.state.register_udf(create_udf(func)); + self.state.register_function(func); } fn read_table(&self, table: TableRef) -> Result { @@ -287,18 +294,31 @@ impl QueryEngine for DatafusionQueryEngine { .context(QueryExecutionSnafu)?, )) } + + fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext { + QueryEngineContext::new(self.state.session_state(), query_ctx) + } } impl LogicalOptimizer for DatafusionQueryEngine { #[tracing::instrument(skip_all)] - fn optimize(&self, plan: &LogicalPlan) -> Result { + fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result { let _timer = metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED.start_timer(); match plan { LogicalPlan::DfPlan(df_plan) => { + // Optimized by extension rules + let optimized_plan = self + .state + .optimize_by_extension_rules(df_plan.clone(), context) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + // Optimized by datafusion optimizer let optimized_plan = self .state .session_state() - .optimize(df_plan) + .optimize(&optimized_plan) .context(error::DatafusionSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; @@ -654,7 +674,7 @@ mod tests { let DescribeResult { schema, logical_plan, - } = engine.describe(plan).await.unwrap(); + } = engine.describe(plan, QueryContext::arc()).await.unwrap(); assert_eq!( schema.column_schemas()[0], diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 983c4155f7e1..4e4b02b0fdb8 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use arrow_schema::DataType; use catalog::table_source::DfTableSourceProvider; +use common_function::scalars::udf::create_udf; use common_query::logical_plan::create_aggregate_function; use datafusion::catalog::TableReference; use datafusion::error::Result as DfResult; @@ -41,6 +42,7 @@ pub struct DfContextProviderAdapter { session_state: SessionState, tables: HashMap>, table_provider: DfTableSourceProvider, + query_ctx: QueryContextRef, } impl DfContextProviderAdapter { @@ -67,6 +69,7 @@ impl DfContextProviderAdapter { session_state, tables, table_provider, + query_ctx, }) } } @@ -104,7 +107,10 @@ impl ContextProvider for DfContextProviderAdapter { } fn get_function_meta(&self, name: &str) -> Option> { - self.session_state.scalar_functions().get(name).cloned() + self.engine_state.udf_function(name).map_or_else( + || self.session_state.scalar_functions().get(name).cloned(), + |func| Some(Arc::new(create_udf(func, self.query_ctx.clone()).into())), + ) } fn get_aggregate_meta(&self, name: &str) -> Option> { diff --git a/src/query/src/error.rs b/src/query/src/error.rs index c343d25d51f5..96db70810481 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -115,6 +115,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid timestamp `{}`", raw))] + InvalidTimestamp { raw: String, location: Location }, + #[snafu(display("Failed to parse float number `{}`", raw))] ParseFloat { raw: String, @@ -271,6 +274,7 @@ impl ErrorExt for Error { | UnknownTable { .. } | TimeIndexNotFound { .. } | ParseTimestamp { .. } + | InvalidTimestamp { .. } | ParseFloat { .. } | MissingRequiredField { .. } | BuildRegex { .. } diff --git a/src/query/src/logical_optimizer.rs b/src/query/src/logical_optimizer.rs index 97e5a70d4a07..ab9bff445879 100644 --- a/src/query/src/logical_optimizer.rs +++ b/src/query/src/logical_optimizer.rs @@ -14,7 +14,10 @@ use crate::error::Result; use crate::plan::LogicalPlan; +use crate::QueryEngineContext; +/// Logical plan optimizer, rewrite the [`LogicalPlan`] in some way. pub trait LogicalOptimizer { - fn optimize(&self, plan: &LogicalPlan) -> Result; + /// Optimize the `plan` + fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result; } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 34108fe4889d..cfce77f039d8 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_expr::LogicalPlan; + +use crate::QueryEngineContext; + +/// [`ExtensionAnalyzerRule`]s transform [`LogicalPlan`]s in some way to make +/// the plan valid prior to the rest of the DataFusion optimization process. +/// It's an extension of datafusion [`AnalyzerRule`]s but accepts [`QueryEngineContext` as the second parameter. +pub trait ExtensionAnalyzerRule { + /// Rewrite `plan` + fn analyze( + &self, + plan: LogicalPlan, + ctx: &QueryEngineContext, + config: &ConfigOptions, + ) -> Result; + + /// A human readable name for this analyzer rule + fn name(&self) -> &str; +} + pub mod order_hint; pub mod string_normalization; pub mod type_conversion; diff --git a/src/query/src/optimizer/type_conversion.rs b/src/query/src/optimizer/type_conversion.rs index 07cba75d7f44..aa1fc73d4c1a 100644 --- a/src/query/src/optimizer/type_conversion.rs +++ b/src/query/src/optimizer/type_conversion.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use common_time::timestamp::{TimeUnit, Timestamp}; +use common_time::Timezone; use datafusion::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; @@ -22,9 +21,12 @@ use datafusion_expr::expr::InList; use datafusion_expr::{ Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan, }; -use datafusion_optimizer::analyzer::AnalyzerRule; use datatypes::arrow::compute; use datatypes::arrow::datatypes::DataType; +use session::context::QueryContextRef; + +use crate::optimizer::ExtensionAnalyzerRule; +use crate::QueryEngineContext; /// TypeConversionRule converts some literal values in logical plan to other types according /// to data type of corresponding columns. @@ -33,12 +35,18 @@ use datatypes::arrow::datatypes::DataType; /// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)` pub struct TypeConversionRule; -impl AnalyzerRule for TypeConversionRule { - fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { +impl ExtensionAnalyzerRule for TypeConversionRule { + fn analyze( + &self, + plan: LogicalPlan, + ctx: &QueryEngineContext, + _config: &ConfigOptions, + ) -> Result { plan.transform(&|plan| match plan { LogicalPlan::Filter(filter) => { let mut converter = TypeConverter { schema: filter.input.schema().clone(), + query_ctx: ctx.query_ctx(), }; let rewritten = filter.predicate.clone().rewrite(&mut converter)?; Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new( @@ -56,6 +64,7 @@ impl AnalyzerRule for TypeConversionRule { }) => { let mut converter = TypeConverter { schema: projected_schema.clone(), + query_ctx: ctx.query_ctx(), }; let rewrite_filters = filters .into_iter() @@ -76,7 +85,6 @@ impl AnalyzerRule for TypeConversionRule { | LogicalPlan::Repartition { .. } | LogicalPlan::Extension { .. } | LogicalPlan::Sort { .. } - | LogicalPlan::Explain { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } | LogicalPlan::Join { .. } @@ -86,6 +94,7 @@ impl AnalyzerRule for TypeConversionRule { | LogicalPlan::Analyze { .. } => { let mut converter = TypeConverter { schema: plan.schema().clone(), + query_ctx: ctx.query_ctx(), }; let inputs = plan.inputs().into_iter().cloned().collect::>(); let expr = plan @@ -98,6 +107,7 @@ impl AnalyzerRule for TypeConversionRule { } LogicalPlan::Subquery { .. } + | LogicalPlan::Explain { .. } | LogicalPlan::SubqueryAlias { .. } | LogicalPlan::EmptyRelation(_) | LogicalPlan::Prepare(_) @@ -116,6 +126,7 @@ impl AnalyzerRule for TypeConversionRule { } struct TypeConverter { + query_ctx: QueryContextRef, schema: DFSchemaRef, } @@ -129,9 +140,15 @@ impl TypeConverter { None } - fn cast_scalar_value(value: &ScalarValue, target_type: &DataType) -> Result { + fn cast_scalar_value( + &self, + value: &ScalarValue, + target_type: &DataType, + ) -> Result { match (target_type, value) { - (DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => string_to_timestamp_ms(v), + (DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => { + string_to_timestamp_ms(v, Some(self.query_ctx.timezone().as_ref())) + } (DataType::Boolean, ScalarValue::Utf8(Some(v))) => match v.to_lowercase().as_str() { "true" => Ok(ScalarValue::Boolean(Some(true))), "false" => Ok(ScalarValue::Boolean(Some(false))), @@ -167,7 +184,7 @@ impl TypeConverter { match (left, right) { (Expr::Column(col), Expr::Literal(value)) => { - let casted_right = Self::cast_scalar_value(value, target_type)?; + let casted_right = self.cast_scalar_value(value, target_type)?; if casted_right.is_null() { return Err(DataFusionError::Plan(format!( "column:{col:?}. Casting value:{value:?} to {target_type:?} is invalid", @@ -176,7 +193,7 @@ impl TypeConverter { Ok((left.clone(), Expr::Literal(casted_right))) } (Expr::Literal(value), Expr::Column(col)) => { - let casted_left = Self::cast_scalar_value(value, target_type)?; + let casted_left = self.cast_scalar_value(value, target_type)?; if casted_left.is_null() { return Err(DataFusionError::Plan(format!( "column:{col:?}. Casting value:{value:?} to {target_type:?} is invalid", @@ -273,8 +290,9 @@ fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None)) } -fn string_to_timestamp_ms(string: &str) -> Result { - let ts = Timestamp::from_str(string).map_err(|e| DataFusionError::External(Box::new(e)))?; +fn string_to_timestamp_ms(string: &str, timezone: Option<&Timezone>) -> Result { + let ts = Timestamp::from_str(string, timezone) + .map_err(|e| DataFusionError::External(Box::new(e)))?; let value = Some(ts.value()); let scalar = match ts.unit() { @@ -295,19 +313,38 @@ mod tests { use datafusion_common::{Column, DFField, DFSchema}; use datafusion_expr::{AggregateFunction, LogicalPlanBuilder}; use datafusion_sql::TableReference; + use session::context::QueryContext; use super::*; #[test] fn test_string_to_timestamp_ms() { assert_eq!( - string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(), + string_to_timestamp_ms("2022-02-02 19:00:00+08:00", None).unwrap(), ScalarValue::TimestampSecond(Some(1643799600), None) ); assert_eq!( - string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(), + string_to_timestamp_ms("2009-02-13 23:31:30Z", None).unwrap(), ScalarValue::TimestampSecond(Some(1234567890), None) ); + + assert_eq!( + string_to_timestamp_ms( + "2009-02-13 23:31:30", + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + ) + .unwrap(), + ScalarValue::TimestampSecond(Some(1234567890 - 8 * 3600), None) + ); + + assert_eq!( + string_to_timestamp_ms( + "2009-02-13 23:31:30", + Some(&Timezone::from_tz_string("-8:00").unwrap()) + ) + .unwrap(), + ScalarValue::TimestampSecond(Some(1234567890 + 8 * 3600), None) + ); } #[test] @@ -363,7 +400,10 @@ mod tests { ) .unwrap(), ); - let mut converter = TypeConverter { schema }; + let mut converter = TypeConverter { + schema, + query_ctx: QueryContext::arc(), + }; assert_eq!( Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::TimestampSecond( @@ -395,7 +435,10 @@ mod tests { ) .unwrap(), ); - let mut converter = TypeConverter { schema }; + let mut converter = TypeConverter { + schema, + query_ctx: QueryContext::arc(), + }; assert_eq!( Expr::Column(Column::from_name(col_name)) @@ -442,9 +485,10 @@ mod tests { .unwrap() .build() .unwrap(); + let context = QueryEngineContext::mock(); let transformed_plan = TypeConversionRule - .analyze(plan, &ConfigOptions::default()) + .analyze(plan, &context, &ConfigOptions::default()) .unwrap(); let expected = String::from( "Aggregate: groupBy=[[]], aggr=[[COUNT(column1)]]\ @@ -457,6 +501,8 @@ mod tests { #[test] fn test_reverse_non_ts_type() { + let context = QueryEngineContext::mock(); + let plan = LogicalPlanBuilder::values(vec![vec![Expr::Literal(ScalarValue::Float64(Some(1.0)))]]) .unwrap() @@ -473,7 +519,7 @@ mod tests { .build() .unwrap(); let transformed_plan = TypeConversionRule - .analyze(plan, &ConfigOptions::default()) + .analyze(plan, &context, &ConfigOptions::default()) .unwrap(); let expected = String::from( "Filter: Utf8(\"1.2345\") < column1\ diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 7ab3df442626..0ecd38188636 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -168,7 +168,6 @@ impl QueryLanguageParser { } fn parse_promql_timestamp(timestamp: &str) -> Result { - // FIXME(dennis): aware of timezone // try rfc3339 format let rfc3339_result = DateTime::parse_from_rfc3339(timestamp) .context(ParseTimestampSnafu { raw: timestamp }) diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 97e8d3d931ee..cf3f47761192 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -26,12 +26,12 @@ use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::statement::Statement; -use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; +use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; use crate::parser::QueryStatement; use crate::plan::LogicalPlan; use crate::query_engine::QueryEngineState; use crate::range_select::plan_rewrite::RangePlanRewriter; -use crate::DfContextProviderAdapter; +use crate::{DfContextProviderAdapter, QueryEngineContext}; #[async_trait] pub trait LogicalPlanner: Send + Sync { @@ -66,7 +66,7 @@ impl DfLogicalPlanner { self.engine_state.clone(), self.session_state.clone(), &df_stmt, - query_ctx, + query_ctx.clone(), ) .await?; @@ -81,9 +81,17 @@ impl DfLogicalPlanner { let result = sql_to_rel .statement_to_plan(df_stmt) .context(PlanSqlSnafu)?; - let plan = RangePlanRewriter::new(table_provider) + let plan = RangePlanRewriter::new(table_provider, query_ctx.clone()) .rewrite(result) .await?; + + // Optimize logical plan by extension rules + let context = QueryEngineContext::new(self.session_state.clone(), query_ctx); + let plan = self + .engine_state + .optimize_by_extension_rules(plan, &context) + .context(DataFusionSnafu)?; + Ok(LogicalPlan::DfPlan(plan)) } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index ec271a818fc9..2ae0b298281d 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -56,22 +56,40 @@ pub trait QueryEngine: Send + Sync { /// so that it can be downcast to a specific implementation. fn as_any(&self) -> &dyn Any; + /// Returns the logical planner fn planner(&self) -> Arc; + /// Returns the query engine name. fn name(&self) -> &str; - async fn describe(&self, plan: LogicalPlan) -> Result; + /// Describe the given [`LogicalPlan`]. + async fn describe( + &self, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result; + /// Execute the given [`LogicalPlan`]. async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result; + /// Register a [`ScalarUdf`]. fn register_udf(&self, udf: ScalarUdf); + /// Register an aggregate function. + /// + /// # Panics + /// Will panic if the function with same name is already registered. fn register_aggregate_function(&self, func: AggregateFunctionMetaRef); + /// Register a SQL function. + /// Will override if the function with same name is already registered. fn register_function(&self, func: FunctionRef); /// Create a DataFrame from a table. fn read_table(&self, table: TableRef) -> Result; + + /// Create a [`QueryEngineContext`]. + fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext; } pub struct QueryEngineFactory { @@ -118,6 +136,7 @@ impl QueryEngineFactory { } } +/// Register all functions implemented by GreptimeDB fn register_functions(query_engine: &Arc) { for func in FUNCTION_REGISTRY.functions() { query_engine.register_function(func); diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index 29eb2171c20e..d5361b4e5f2c 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -51,4 +51,23 @@ impl QueryEngineContext { state.runtime_env().clone(), )) } + + /// Mock an engine context for unit tests. + #[cfg(any(test, feature = "test"))] + pub fn mock() -> Self { + use common_base::Plugins; + use session::context::QueryContext; + + use crate::query_engine::QueryEngineState; + + let state = Arc::new(QueryEngineState::new( + catalog::memory::new_memory_catalog_manager().unwrap(), + None, + None, + false, + Plugins::default(), + )); + + QueryEngineContext::new(state.session_state(), QueryContext::arc()) + } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 4da21338df5c..a8259694fa88 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -19,9 +19,11 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::Plugins; +use common_function::function::FunctionRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::SessionContext; use common_query::prelude::ScalarUdf; +use common_telemetry::warn; use datafusion::catalog::MemoryCatalogList; use datafusion::dataframe::DataFrame; use datafusion::error::Result as DfResult; @@ -42,10 +44,12 @@ use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; use crate::optimizer::order_hint::OrderHintRule; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; +use crate::optimizer::ExtensionAnalyzerRule; use crate::query_engine::options::QueryOptions; use crate::range_select::planner::RangeSelectPlanner; use crate::region_query::RegionQueryHandlerRef; use crate::table_mutation::TableMutationHandlerRef; +use crate::QueryEngineContext; /// Query engine global state // TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it, @@ -56,7 +60,9 @@ pub struct QueryEngineState { df_context: SessionContext, catalog_manager: CatalogManagerRef, table_mutation_handler: Option, + udf_functions: Arc>>, aggregate_functions: Arc>>, + extension_rules: Vec>, plugins: Plugins, } @@ -78,9 +84,12 @@ impl QueryEngineState { ) -> Self { let runtime_env = Arc::new(RuntimeEnv::default()); let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false); - // Apply the type conversion rule first. + // Apply extension rules + let mut extension_rules = Vec::new(); + // The [`TypeConversionRule`] must be at first + extension_rules.insert(0, Arc::new(TypeConversionRule) as _); + // Apply the datafusion rules let mut analyzer = Analyzer::new(); - analyzer.rules.insert(0, Arc::new(TypeConversionRule)); analyzer.rules.insert(0, Arc::new(StringNormalizationRule)); Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name()); analyzer.rules.insert(0, Arc::new(CountWildcardRule {})); @@ -110,7 +119,9 @@ impl QueryEngineState { catalog_manager: catalog_list, table_mutation_handler, aggregate_functions: Arc::new(RwLock::new(HashMap::new())), + extension_rules, plugins, + udf_functions: Arc::new(RwLock::new(HashMap::new())), } } @@ -118,12 +129,44 @@ impl QueryEngineState { rules.retain(|rule| rule.name() != name); } - /// Register a udf function - // TODO(dennis): manage UDFs by ourself. - pub fn register_udf(&self, udf: ScalarUdf) { - self.df_context.register_udf(udf.into_df_udf()); + /// Optimize the logical plan by the extension anayzer rules. + pub fn optimize_by_extension_rules( + &self, + plan: DfLogicalPlan, + context: &QueryEngineContext, + ) -> DfResult { + self.extension_rules + .iter() + .try_fold(plan, |acc_plan, rule| { + rule.analyze(acc_plan, context, self.session_state().config_options()) + }) + } + + /// Register an udf function. + /// Will override if the function with same name is already registered. + pub fn register_function(&self, func: FunctionRef) { + let name = func.name().to_string(); + let x = self + .udf_functions + .write() + .unwrap() + .insert(name.clone(), func); + + if x.is_some() { + warn!("Already registered udf function '{name}'"); + } } + /// Retrieve the udf function by name + pub fn udf_function(&self, function_name: &str) -> Option { + self.udf_functions + .read() + .unwrap() + .get(function_name) + .cloned() + } + + /// Retrieve the aggregate function by name pub fn aggregate_function(&self, function_name: &str) -> Option { self.aggregate_functions .read() @@ -132,6 +175,11 @@ impl QueryEngineState { .cloned() } + /// Register a [`ScalarUdf`]. + pub fn register_udf(&self, udf: ScalarUdf) { + self.df_context.register_udf(udf.into()); + } + /// Register an aggregate function. /// /// # Panics diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index dcb1ba7de5c8..5b93180b0d2b 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::BTreeSet; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -22,7 +21,7 @@ use async_recursion::async_recursion; use catalog::table_source::DfTableSourceProvider; use common_time::interval::NANOS_PER_MILLI; use common_time::timestamp::TimeUnit; -use common_time::{Interval, Timestamp}; +use common_time::{Interval, Timestamp, Timezone}; use datafusion::datasource::DefaultTableSource; use datafusion::prelude::Column; use datafusion::scalar::ScalarValue; @@ -35,6 +34,7 @@ use datafusion_expr::{ }; use datatypes::prelude::ConcreteDataType; use promql_parser::util::parse_duration; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -56,6 +56,7 @@ pub struct RangeExprRewriter<'a> { /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice range_fn: BTreeSet, sub_aggr: &'a Aggregate, + query_ctx: &'a QueryContextRef, } impl<'a> RangeExprRewriter<'a> { @@ -134,7 +135,7 @@ fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult { /// 1. NOW: align to current execute time /// 2. Timestamp string: align to specific timestamp /// 3. leave empty (as Default Option): align to unix epoch 0 -fn parse_align_to(args: &[Expr], i: usize) -> DFResult { +fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult { let s = parse_str_expr(args, i)?; let upper = s.to_uppercase(); match upper.as_str() { @@ -143,7 +144,8 @@ fn parse_align_to(args: &[Expr], i: usize) -> DFResult { "" => return Ok(0), _ => (), } - Timestamp::from_str(s) + + Timestamp::from_str(s, timezone) .map_err(|e| { DataFusionError::Plan(format!( "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}", @@ -206,7 +208,11 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { .map_err(|e| DataFusionError::Plan(e.to_string()))?; let by = parse_expr_list(&func.args, 4, byc)?; let align = parse_duration_expr(&func.args, byc + 4)?; - let align_to = parse_align_to(&func.args, byc + 5)?; + let align_to = parse_align_to( + &func.args, + byc + 5, + Some(self.query_ctx.timezone().as_ref()), + )?; let mut data_type = range_expr.get_type(self.input_plan.schema())?; let mut need_cast = false; let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?; @@ -247,11 +253,15 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { /// collecting info we need to generate RangeSelect Query LogicalPlan and rewrite th original LogicalPlan. pub struct RangePlanRewriter { table_provider: DfTableSourceProvider, + query_ctx: QueryContextRef, } impl RangePlanRewriter { - pub fn new(table_provider: DfTableSourceProvider) -> Self { - Self { table_provider } + pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self { + Self { + table_provider, + query_ctx, + } } pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result { @@ -295,6 +305,7 @@ impl RangePlanRewriter { by: vec![], range_fn: BTreeSet::new(), sub_aggr: aggr_plan, + query_ctx: &self.query_ctx, }; let new_expr = expr .iter() @@ -747,15 +758,28 @@ mod test { fn test_parse_align_to() { // test NOW let args = vec![Expr::Literal(ScalarValue::Utf8(Some("NOW".into())))]; - let epsinon = parse_align_to(&args, 0).unwrap() - Timestamp::current_millis().value(); + let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value(); assert!(epsinon.abs() < 100); // test default let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))]; - assert!(parse_align_to(&args, 0).unwrap() == 0); + assert!(parse_align_to(&args, 0, None).unwrap() == 0); // test Timestamp let args = vec![Expr::Literal(ScalarValue::Utf8(Some( "1970-01-01T00:00:00+08:00".into(), )))]; - assert!(parse_align_to(&args, 0).unwrap() == -8 * 60 * 60 * 1000); + assert!(parse_align_to(&args, 0, None).unwrap() == -8 * 60 * 60 * 1000); + // timezone + let args = vec![Expr::Literal(ScalarValue::Utf8(Some( + "1970-01-01T00:00:00".into(), + )))]; + assert!( + parse_align_to( + &args, + 0, + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + ) + .unwrap() + == -8 * 60 * 60 * 1000 + ); } } diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 1c43cfd4b202..8496a648b680 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -40,7 +40,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap(); let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(stmt, query_ctx.clone()) .await .unwrap(); let Output::Stream(stream) = engine.execute(plan, query_ctx).await.unwrap() else { diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 2730f28d9a84..70ff108271c1 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -150,7 +150,7 @@ impl Function for PyUDF { fn eval( &self, - _func_ctx: common_function::function::FunctionContext, + func_ctx: common_function::function::FunctionContext, columns: &[datatypes::vectors::VectorRef], ) -> common_query::error::Result { // FIXME(discord9): exec_parsed require a RecordBatch(basically a Vector+Schema), where schema can't pop out from nowhere, right? @@ -158,15 +158,17 @@ impl Function for PyUDF { let columns = columns.to_vec(); let rb = Some(RecordBatch::new(schema, columns).context(UdfTempRecordBatchSnafu)?); - // FIXME(dennis): Create EvalContext from FunctionContext. - let res = exec_parsed(&self.copr, &rb, &HashMap::new(), &EvalContext::default()).map_err( - |err| { - PyUdfSnafu { - msg: format!("{err:#?}"), - } - .build() + let res = exec_parsed( + &self.copr, + &rb, + &HashMap::new(), + &EvalContext { + query_ctx: func_ctx.query_ctx.clone(), }, - )?; + ) + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)?; + let len = res.columns().len(); if len == 0 { return PyUdfSnafu { diff --git a/src/servers/src/mysql/federated.rs b/src/servers/src/mysql/federated.rs index 2efc45128a1f..5744894b9f91 100644 --- a/src/servers/src/mysql/federated.rs +++ b/src/servers/src/mysql/federated.rs @@ -37,8 +37,8 @@ static SHOW_LOWER_CASE_PATTERN: Lazy = Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES LIKE 'lower_case_table_names'(.*))").unwrap()); static SHOW_COLLATION_PATTERN: Lazy = Lazy::new(|| Regex::new("(?i)^(show collation where(.*))").unwrap()); -static SHOW_VARIABLES_PATTERN: Lazy = - Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES(.*))").unwrap()); +static SHOW_VARIABLES_LIKE_PATTERN: Lazy = + Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES( LIKE (.*))?)").unwrap()); static SELECT_DATABASE_PATTERN: Lazy = Lazy::new(|| Regex::new(r"(?i)^(SELECT DATABASE\(\s*\))").unwrap()); @@ -247,7 +247,8 @@ fn check_show_variables(query: &str) -> Option { Some(show_variables("sql_mode", "ONLY_FULL_GROUP_BY STRICT_TRANS_TABLES NO_ZERO_IN_DATE NO_ZERO_DATE ERROR_FOR_DIVISION_BY_ZERO NO_ENGINE_SUBSTITUTION")) } else if SHOW_LOWER_CASE_PATTERN.is_match(query) { Some(show_variables("lower_case_table_names", "0")) - } else if SHOW_COLLATION_PATTERN.is_match(query) || SHOW_VARIABLES_PATTERN.is_match(query) { + } else if SHOW_COLLATION_PATTERN.is_match(query) || SHOW_VARIABLES_LIKE_PATTERN.is_match(query) + { Some(show_variables("", "")) } else { None diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index dabde7e00013..ff60ae007fa6 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -99,10 +99,10 @@ impl SqlQueryHandler for DummyInstance { let plan = self .query_engine .planner() - .plan(QueryStatement::Sql(stmt), query_ctx) + .plan(QueryStatement::Sql(stmt), query_ctx.clone()) .await .unwrap(); - let schema = self.query_engine.describe(plan).await.unwrap(); + let schema = self.query_engine.describe(plan, query_ctx).await.unwrap(); Ok(Some(schema)) } else { Ok(None) diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index c8da838234b9..724c4cc42297 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -74,7 +74,7 @@ fn parse_string_to_value( match data_type { ConcreteDataType::String(_) => Ok(Value::String(s.into())), ConcreteDataType::Date(_) => { - if let Ok(date) = common_time::date::Date::from_str(&s) { + if let Ok(date) = common_time::date::Date::from_str_utc(&s) { Ok(Value::Date(date)) } else { ParseSqlValueSnafu { @@ -84,7 +84,7 @@ fn parse_string_to_value( } } ConcreteDataType::DateTime(_) => { - if let Ok(datetime) = common_time::datetime::DateTime::from_str(&s) { + if let Ok(datetime) = common_time::datetime::DateTime::from_str_system(&s) { Ok(Value::DateTime(datetime)) } else { ParseSqlValueSnafu { @@ -94,7 +94,7 @@ fn parse_string_to_value( } } ConcreteDataType::Timestamp(t) => { - if let Ok(ts) = Timestamp::from_str(&s) { + if let Ok(ts) = Timestamp::from_str_utc(&s) { Ok(Value::Timestamp(ts.convert_to(t.unit()).context( TimestampOverflowSnafu { timestamp: ts, diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 514541f2dd5c..bd5d6d2d1818 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -19,6 +19,7 @@ use common_telemetry::{error, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; +use datafusion::common::ScalarValue; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::ToDFSchema; use datafusion_expr::expr::InList; @@ -34,6 +35,22 @@ use crate::error; #[cfg(test)] mod stats; +/// Assert the scalar value is not utf8. Returns `None` if it's utf8. +/// In theory, it should be converted to a timestamp scalar value by `TypeConversionRule`. +macro_rules! return_none_if_utf8 { + ($lit: ident) => { + if matches!($lit, ScalarValue::Utf8(_)) { + warn!( + "Unexpected ScalarValue::Utf8 in time range predicate: {:?}. Maybe it's an implicit bug, please report it to https://github.com/GreptimeTeam/greptimedb/issues", + $lit + ); + + // Make the predicate ineffective. + return None; + } + }; +} + #[derive(Debug, Clone)] pub struct Predicate { /// logical exprs @@ -282,7 +299,9 @@ impl<'a> TimeRangePredicateBuilder<'a> { if col.name != self.ts_col_name { return None; } - scalar_value_to_timestamp(lit).map(|t| (t, reverse)) + + return_none_if_utf8!(lit); + scalar_value_to_timestamp(lit, None).map(|t| (t, reverse)) } fn extract_from_between_expr( @@ -305,9 +324,12 @@ impl<'a> TimeRangePredicateBuilder<'a> { match (low, high) { (DfExpr::Literal(low), DfExpr::Literal(high)) => { - let low_opt = - scalar_value_to_timestamp(low).and_then(|ts| ts.convert_to(self.ts_col_unit)); - let high_opt = scalar_value_to_timestamp(high) + return_none_if_utf8!(low); + return_none_if_utf8!(high); + + let low_opt = scalar_value_to_timestamp(low, None) + .and_then(|ts| ts.convert_to(self.ts_col_unit)); + let high_opt = scalar_value_to_timestamp(high, None) .and_then(|ts| ts.convert_to_ceil(self.ts_col_unit)); Some(TimestampRange::new_inclusive(low_opt, high_opt)) } @@ -338,7 +360,8 @@ impl<'a> TimeRangePredicateBuilder<'a> { let mut init_range = TimestampRange::empty(); for expr in list { if let DfExpr::Literal(scalar) = expr { - if let Some(timestamp) = scalar_value_to_timestamp(scalar) { + return_none_if_utf8!(scalar); + if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { init_range = init_range.or(&TimestampRange::single(timestamp)) } else { // TODO(hl): maybe we should raise an error here since cannot parse diff --git a/tests/cases/standalone/common/system/timezone.result b/tests/cases/standalone/common/system/timezone.result new file mode 100644 index 000000000000..8f3c59fbd248 --- /dev/null +++ b/tests/cases/standalone/common/system/timezone.result @@ -0,0 +1,271 @@ +--- tests for timezone --- +SHOW VARIABLES time_zone; + ++-----------+ +| TIME_ZONE | ++-----------+ +| UTC | ++-----------+ + +SHOW VARIABLES system_time_zone; + ++------------------+ +| SYSTEM_TIME_ZONE | ++------------------+ +| UTC | ++------------------+ + +CREATE TABLE test(d double, ts timestamp_ms time index); + +Affected Rows: 0 + +INSERT INTO test values + (1, '2024-01-01 00:00:00'), + (2, '2024-01-02 08:00:00'), + (3, '2024-01-03 16:00:00'), + (4, '2024-01-04 00:00:00'), + (5, '2024-01-05 00:00:00+08:00'); + +Affected Rows: 5 + +SELECT * from test; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | ++-----+---------------------+ + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + ++----------------------------------------------------+ +| date_format(test.ts,Utf8("%Y-%m-%d %H:%M:%S:%3f")) | ++----------------------------------------------------+ +| 2024-01-01 00:00:00:000 | +| 2024-01-02 08:00:00:000 | +| 2024-01-03 16:00:00:000 | +| 2024-01-04 00:00:00:000 | +| 2024-01-04 16:00:00:000 | ++----------------------------------------------------+ + +select to_unixtime('2024-01-02 00:00:00'); + ++------------------------------------------+ +| to_unixtime(Utf8("2024-01-02 00:00:00")) | ++------------------------------------------+ +| 1704153600 | ++------------------------------------------+ + +select to_unixtime('2024-01-02T00:00:00+08:00'); + ++------------------------------------------------+ +| to_unixtime(Utf8("2024-01-02T00:00:00+08:00")) | ++------------------------------------------------+ +| 1704124800 | ++------------------------------------------------+ + +--- UTC+8 --- +SET TIME_ZONE = '+8:00'; + +Affected Rows: 0 + +SHOW VARIABLES time_zone; + ++-----------+ +| TIME_ZONE | ++-----------+ +| +08:00 | ++-----------+ + +SHOW VARIABLES system_time_zone; + ++------------------+ +| SYSTEM_TIME_ZONE | ++------------------+ +| UTC | ++------------------+ + +SELECT * from test; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | ++-----+---------------------+ + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + ++----------------------------------------------------+ +| date_format(test.ts,Utf8("%Y-%m-%d %H:%M:%S:%3f")) | ++----------------------------------------------------+ +| 2024-01-01 08:00:00:000 | +| 2024-01-02 16:00:00:000 | +| 2024-01-04 00:00:00:000 | +| 2024-01-04 08:00:00:000 | +| 2024-01-05 00:00:00:000 | ++----------------------------------------------------+ + +select to_unixtime('2024-01-02 00:00:00'); + ++------------------------------------------+ +| to_unixtime(Utf8("2024-01-02 00:00:00")) | ++------------------------------------------+ +| 1704124800 | ++------------------------------------------+ + +select to_unixtime('2024-01-02 00:00:00+08:00'); + ++------------------------------------------------+ +| to_unixtime(Utf8("2024-01-02 00:00:00+08:00")) | ++------------------------------------------------+ +| 1704124800 | ++------------------------------------------------+ + +--- UTC-8 --- +SET TIME_ZONE = '-8:00'; + +Affected Rows: 0 + +SHOW VARIABLES time_zone; + ++-----------+ +| TIME_ZONE | ++-----------+ +| -08:00 | ++-----------+ + +SHOW VARIABLES system_time_zone; + ++------------------+ +| SYSTEM_TIME_ZONE | ++------------------+ +| UTC | ++------------------+ + +SELECT * from test; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | +| 5.0 | 2024-01-04T16:00:00 | ++-----+---------------------+ + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + ++-----+---------------------+ +| d | ts | ++-----+---------------------+ +| 1.0 | 2024-01-01T00:00:00 | +| 2.0 | 2024-01-02T08:00:00 | +| 3.0 | 2024-01-03T16:00:00 | +| 4.0 | 2024-01-04T00:00:00 | ++-----+---------------------+ + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + ++----------------------------------------------------+ +| date_format(test.ts,Utf8("%Y-%m-%d %H:%M:%S:%3f")) | ++----------------------------------------------------+ +| 2023-12-31 16:00:00:000 | +| 2024-01-02 00:00:00:000 | +| 2024-01-03 08:00:00:000 | +| 2024-01-03 16:00:00:000 | +| 2024-01-04 08:00:00:000 | ++----------------------------------------------------+ + +select to_unixtime('2024-01-02 00:00:00'); + ++------------------------------------------+ +| to_unixtime(Utf8("2024-01-02 00:00:00")) | ++------------------------------------------+ +| 1704182400 | ++------------------------------------------+ + +select to_unixtime('2024-01-02 00:00:00+08:00'); + ++------------------------------------------------+ +| to_unixtime(Utf8("2024-01-02 00:00:00+08:00")) | ++------------------------------------------------+ +| 1704124800 | ++------------------------------------------------+ + +drop table test; + +Affected Rows: 0 + +-- revert timezone to UTC +SET TIME_ZONE = 'UTC'; + +Affected Rows: 0 + +SHOW VARIABLES time_zone; + ++-----------+ +| TIME_ZONE | ++-----------+ +| UTC | ++-----------+ + diff --git a/tests/cases/standalone/common/system/timezone.sql b/tests/cases/standalone/common/system/timezone.sql new file mode 100644 index 000000000000..4b3d5f4f2d14 --- /dev/null +++ b/tests/cases/standalone/common/system/timezone.sql @@ -0,0 +1,70 @@ +--- tests for timezone --- +SHOW VARIABLES time_zone; + +SHOW VARIABLES system_time_zone; + +CREATE TABLE test(d double, ts timestamp_ms time index); + +INSERT INTO test values + (1, '2024-01-01 00:00:00'), + (2, '2024-01-02 08:00:00'), + (3, '2024-01-03 16:00:00'), + (4, '2024-01-04 00:00:00'), + (5, '2024-01-05 00:00:00+08:00'); + +SELECT * from test; + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + +select to_unixtime('2024-01-02 00:00:00'); + +select to_unixtime('2024-01-02T00:00:00+08:00'); + +--- UTC+8 --- +SET TIME_ZONE = '+8:00'; + +SHOW VARIABLES time_zone; + +SHOW VARIABLES system_time_zone; + +SELECT * from test; + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + +select to_unixtime('2024-01-02 00:00:00'); + +select to_unixtime('2024-01-02 00:00:00+08:00'); + +--- UTC-8 --- +SET TIME_ZONE = '-8:00'; + +SHOW VARIABLES time_zone; + +SHOW VARIABLES system_time_zone; + +SELECT * from test; + +SELECT * from test where ts >= '2024-01-02 08:00:00'; + +SELECT * from test where ts <= '2024-01-03 16:00:00'; + +select date_format(ts, '%Y-%m-%d %H:%M:%S:%3f') from test; + +select to_unixtime('2024-01-02 00:00:00'); + +select to_unixtime('2024-01-02 00:00:00+08:00'); + +drop table test; + +-- revert timezone to UTC +SET TIME_ZONE = 'UTC'; + +SHOW VARIABLES time_zone; diff --git a/tests/cases/standalone/common/types/timestamp/timestamp.result b/tests/cases/standalone/common/types/timestamp/timestamp.result index 6eaeb2e7a4f1..2c21d1390ca1 100644 --- a/tests/cases/standalone/common/types/timestamp/timestamp.result +++ b/tests/cases/standalone/common/types/timestamp/timestamp.result @@ -101,8 +101,21 @@ SELECT t%t FROM timestamp; Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(Millisecond, None) % Timestamp(Millisecond, None): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(Millisecond, None) % Timestamp(Millisecond, None) --- TODO(dennis): It can't run on distributed mode, uncomment it when the issue is fixed: https://github.com/GreptimeTeam/greptimedb/issues/2071 -- --- SELECT t-t FROM timestamp; -- +SELECT t-t FROM timestamp; + ++---------------------------+ +| timestamp.t - timestamp.t | ++---------------------------+ +| PT0S | +| | +| PT0S | +| PT0S | +| PT0S | +| PT0S | +| PT0S | +| PT0S | ++---------------------------+ + SELECT EXTRACT(YEAR from TIMESTAMP '1992-01-01 01:01:01'); +-----------------------------------------------------+ diff --git a/tests/cases/standalone/common/types/timestamp/timestamp.sql b/tests/cases/standalone/common/types/timestamp/timestamp.sql index aa97e778bfa8..e2924bc2e0f6 100644 --- a/tests/cases/standalone/common/types/timestamp/timestamp.sql +++ b/tests/cases/standalone/common/types/timestamp/timestamp.sql @@ -32,8 +32,7 @@ SELECT t/t FROM timestamp; SELECT t%t FROM timestamp; --- TODO(dennis): It can't run on distributed mode, uncomment it when the issue is fixed: https://github.com/GreptimeTeam/greptimedb/issues/2071 -- --- SELECT t-t FROM timestamp; -- +SELECT t-t FROM timestamp; SELECT EXTRACT(YEAR from TIMESTAMP '1992-01-01 01:01:01'); diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 2378f088513e..8d1f189b7266 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -411,13 +411,32 @@ impl Database for GreptimeDB { } let mut client = self.client.lock().await; + if query.trim().to_lowercase().starts_with("use ") { + // use [db] let database = query .split_ascii_whitespace() .nth(1) .expect("Illegal `USE` statement: expecting a database.") .trim_end_matches(';'); client.set_schema(database); + Box::new(ResultDisplayer { + result: Ok(Output::AffectedRows(0)), + }) as _ + } else if query.trim().to_lowercase().starts_with("set time_zone") { + // set time_zone='xxx' + let timezone = query + .split('=') + .nth(1) + .expect("Illegal `SET TIMEZONE` statement: expecting a timezone expr.") + .trim() + .strip_prefix('\'') + .unwrap() + .strip_suffix("';") + .unwrap(); + + client.set_timezone(timezone); + Box::new(ResultDisplayer { result: Ok(Output::AffectedRows(0)), }) as _