From 0f992183867c145330ec5602b6f50c13a3b9840d Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sun, 22 Sep 2024 10:39:38 +0800 Subject: [PATCH] feat: list/array/timezone support for postgres output (#4727) * feat: list/array support for postgres output * fix: implement time zone support for postgrsql * feat: add a geohash function that returns array * fix: typo * fix: lint warnings * test: add sqlness test * refactor: check resolution range before convert value * fix: test result for sqlness * feat: upgrade pgwire apis --- Cargo.lock | 42 +- src/common/function/src/scalars/geo.rs | 3 +- .../function/src/scalars/geo/geohash.rs | 198 +++++++- src/servers/Cargo.toml | 2 +- src/servers/src/postgres.rs | 35 +- src/servers/src/postgres/auth_handler.rs | 4 +- src/servers/src/postgres/handler.rs | 6 +- src/servers/src/postgres/server.rs | 10 +- src/servers/src/postgres/types.rs | 448 ++++++++++++++++-- src/servers/src/postgres/types/datetime.rs | 61 ++- tests-integration/tests/sql.rs | 32 ++ .../standalone/common/function/geo.result | 12 +- .../cases/standalone/common/function/geo.sql | 2 + 13 files changed, 742 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 000d6316ecca..013b0a205abd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3303,9 +3303,9 @@ dependencies = [ [[package]] name = "derive-new" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" +checksum = "2cdc8d50f426189eef89dac62fabfa0abb27d5cc008f25bf4156a0203325becc" dependencies = [ "proc-macro2", "quote", @@ -5760,6 +5760,29 @@ dependencies = [ "serde", ] +[[package]] +name = "lazy-regex" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d8e41c97e6bc7ecb552016274b99fbb5d035e8de288c582d9b933af6677bfda" +dependencies = [ + "lazy-regex-proc_macros", + "once_cell", + "regex-lite", +] + +[[package]] +name = "lazy-regex-proc_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76e1d8b05d672c53cb9c7b920bbba8783845ae4f0b076e02a3db1d02c81b4163" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.66", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -7856,20 +7879,22 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.22.0" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3770f56e1e8a608c6de40011b9a00c6b669c14d121024411701b4bc3b2a5be99" +checksum = "ed4ca46dd335b3a030d977be54dfe121b1b9fe22aa8bbd69161ac2434524fc68" dependencies = [ "async-trait", "bytes", "chrono", - "derive-new 0.6.0", + "derive-new 0.7.0", "futures", "hex", + "lazy-regex", "md5", "postgres-types", "rand", "ring 0.17.8", + "rust_decimal", "thiserror", "tokio", "tokio-rustls 0.26.0", @@ -9075,6 +9100,12 @@ dependencies = [ "regex-syntax 0.8.4", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -9550,6 +9581,7 @@ dependencies = [ "borsh", "bytes", "num-traits", + "postgres-types", "rand", "rkyv", "serde", diff --git a/src/common/function/src/scalars/geo.rs b/src/common/function/src/scalars/geo.rs index 9e415a3fdddb..8ad6a7aef22a 100644 --- a/src/common/function/src/scalars/geo.rs +++ b/src/common/function/src/scalars/geo.rs @@ -16,7 +16,7 @@ use std::sync::Arc; mod geohash; mod h3; -use geohash::GeohashFunction; +use geohash::{GeohashFunction, GeohashNeighboursFunction}; use crate::function_registry::FunctionRegistry; @@ -26,6 +26,7 @@ impl GeoFunctions { pub fn register(registry: &FunctionRegistry) { // geohash registry.register(Arc::new(GeohashFunction)); + registry.register(Arc::new(GeohashNeighboursFunction)); // h3 family registry.register(Arc::new(h3::H3LatLngToCell)); registry.register(Arc::new(h3::H3LatLngToCellString)); diff --git a/src/common/function/src/scalars/geo/geohash.rs b/src/common/function/src/scalars/geo/geohash.rs index 2daa8223ccd6..d35a6a06ffe9 100644 --- a/src/common/function/src/scalars/geo/geohash.rs +++ b/src/common/function/src/scalars/geo/geohash.rs @@ -20,23 +20,69 @@ use common_query::error::{self, InvalidFuncArgsSnafu, Result}; use common_query::prelude::{Signature, TypeSignature}; use datafusion::logical_expr::Volatility; use datatypes::prelude::ConcreteDataType; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::value::Value; -use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef}; +use datatypes::scalars::{Scalar, ScalarVectorBuilder}; +use datatypes::value::{ListValue, Value}; +use datatypes::vectors::{ListVectorBuilder, MutableVector, StringVectorBuilder, VectorRef}; use geohash::Coord; use snafu::{ensure, ResultExt}; use crate::function::{Function, FunctionContext}; +macro_rules! ensure_resolution_usize { + ($v: ident) => { + if !($v > 0 && $v <= 12) { + Err(BoxedError::new(PlainError::new( + format!("Invalid geohash resolution {}, expect value: [1, 12]", $v), + StatusCode::EngineExecuteQuery, + ))) + .context(error::ExecuteSnafu) + } else { + Ok($v as usize) + } + }; +} + +fn try_into_resolution(v: Value) -> Result { + match v { + Value::Int8(v) => { + ensure_resolution_usize!(v) + } + Value::Int16(v) => { + ensure_resolution_usize!(v) + } + Value::Int32(v) => { + ensure_resolution_usize!(v) + } + Value::Int64(v) => { + ensure_resolution_usize!(v) + } + Value::UInt8(v) => { + ensure_resolution_usize!(v) + } + Value::UInt16(v) => { + ensure_resolution_usize!(v) + } + Value::UInt32(v) => { + ensure_resolution_usize!(v) + } + Value::UInt64(v) => { + ensure_resolution_usize!(v) + } + _ => unreachable!(), + } +} + /// Function that return geohash string for a given geospatial coordinate. #[derive(Clone, Debug, Default)] pub struct GeohashFunction; -const NAME: &str = "geohash"; +impl GeohashFunction { + const NAME: &'static str = "geohash"; +} impl Function for GeohashFunction { fn name(&self) -> &str { - NAME + Self::NAME } fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { @@ -93,17 +139,7 @@ impl Function for GeohashFunction { for i in 0..size { let lat = lat_vec.get(i).as_f64_lossy(); let lon = lon_vec.get(i).as_f64_lossy(); - let r = match resolution_vec.get(i) { - Value::Int8(v) => v as usize, - Value::Int16(v) => v as usize, - Value::Int32(v) => v as usize, - Value::Int64(v) => v as usize, - Value::UInt8(v) => v as usize, - Value::UInt16(v) => v as usize, - Value::UInt32(v) => v as usize, - Value::UInt64(v) => v as usize, - _ => unreachable!(), - }; + let r = try_into_resolution(resolution_vec.get(i))?; let result = match (lat, lon) { (Some(lat), Some(lon)) => { @@ -130,6 +166,134 @@ impl Function for GeohashFunction { impl fmt::Display for GeohashFunction { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", NAME) + write!(f, "{}", Self::NAME) + } +} + +/// Function that return geohash string for a given geospatial coordinate. +#[derive(Clone, Debug, Default)] +pub struct GeohashNeighboursFunction; + +impl GeohashNeighboursFunction { + const NAME: &'static str = "geohash_neighbours"; +} + +impl Function for GeohashNeighboursFunction { + fn name(&self) -> &str { + GeohashNeighboursFunction::NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::list_datatype( + ConcreteDataType::string_datatype(), + )) + } + + fn signature(&self) -> Signature { + let mut signatures = Vec::new(); + for coord_type in &[ + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ] { + for resolution_type in &[ + ConcreteDataType::int8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::uint64_datatype(), + ] { + signatures.push(TypeSignature::Exact(vec![ + // latitude + coord_type.clone(), + // longitude + coord_type.clone(), + // resolution + resolution_type.clone(), + ])); + } + } + Signature::one_of(signatures, Volatility::Stable) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 3, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 3, provided : {}", + columns.len() + ), + } + ); + + let lat_vec = &columns[0]; + let lon_vec = &columns[1]; + let resolution_vec = &columns[2]; + + let size = lat_vec.len(); + let mut results = + ListVectorBuilder::with_type_capacity(ConcreteDataType::string_datatype(), size); + + for i in 0..size { + let lat = lat_vec.get(i).as_f64_lossy(); + let lon = lon_vec.get(i).as_f64_lossy(); + let r = try_into_resolution(resolution_vec.get(i))?; + + let result = match (lat, lon) { + (Some(lat), Some(lon)) => { + let coord = Coord { x: lon, y: lat }; + let encoded = geohash::encode(coord, r) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Geohash error: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + let neighbours = geohash::neighbors(&encoded) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Geohash error: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + Some(ListValue::new( + vec![ + neighbours.n, + neighbours.nw, + neighbours.w, + neighbours.sw, + neighbours.s, + neighbours.se, + neighbours.e, + neighbours.ne, + ] + .into_iter() + .map(Value::from) + .collect(), + ConcreteDataType::string_datatype(), + )) + } + _ => None, + }; + + if let Some(list_value) = result { + results.push(Some(list_value.as_scalar_ref())); + } else { + results.push(None); + } + } + + Ok(results.to_vector()) + } +} + +impl fmt::Display for GeohashNeighboursFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", GeohashNeighboursFunction::NAME) } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 54665b8c686c..b94fa17d44c0 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -71,7 +71,7 @@ openmetrics-parser = "0.4" opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" } opentelemetry-proto.workspace = true parking_lot = "0.12" -pgwire = { version = "0.22", default-features = false, features = ["server-api-ring"] } +pgwire = { version = "0.24.2", default-features = false, features = ["server-api-ring"] } pin-project = "1.0" pipeline.workspace = true postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] } diff --git a/src/servers/src/postgres.rs b/src/servers/src/postgres.rs index 36f6730b43d7..5e8de2294e18 100644 --- a/src/servers/src/postgres.rs +++ b/src/servers/src/postgres.rs @@ -32,7 +32,8 @@ use std::sync::Arc; use ::auth::UserProviderRef; use derive_builder::Builder; use pgwire::api::auth::ServerParameterProvider; -use pgwire::api::ClientInfo; +use pgwire::api::copy::NoopCopyHandler; +use pgwire::api::{ClientInfo, PgWireHandlerFactory}; pub use server::PostgresServer; use session::context::Channel; use session::Session; @@ -68,7 +69,7 @@ impl ServerParameterProvider for GreptimeDBStartupParameters { } } -pub struct PostgresServerHandler { +pub struct PostgresServerHandlerInner { query_handler: ServerSqlQueryHandlerRef, login_verifier: PgLoginVerifier, force_tls: bool, @@ -87,10 +88,35 @@ pub(crate) struct MakePostgresServerHandler { force_tls: bool, } +pub(crate) struct PostgresServerHandler(Arc); + +impl PgWireHandlerFactory for PostgresServerHandler { + type StartupHandler = PostgresServerHandlerInner; + type SimpleQueryHandler = PostgresServerHandlerInner; + type ExtendedQueryHandler = PostgresServerHandlerInner; + type CopyHandler = NoopCopyHandler; + + fn simple_query_handler(&self) -> Arc { + self.0.clone() + } + + fn extended_query_handler(&self) -> Arc { + self.0.clone() + } + + fn startup_handler(&self) -> Arc { + self.0.clone() + } + + fn copy_handler(&self) -> Arc { + Arc::new(NoopCopyHandler) + } +} + impl MakePostgresServerHandler { fn make(&self, addr: Option) -> PostgresServerHandler { let session = Arc::new(Session::new(addr, Channel::Postgres, Default::default())); - PostgresServerHandler { + let handler = PostgresServerHandlerInner { query_handler: self.query_handler.clone(), login_verifier: PgLoginVerifier::new(self.user_provider.clone()), force_tls: self.force_tls, @@ -98,6 +124,7 @@ impl MakePostgresServerHandler { session: session.clone(), query_parser: Arc::new(DefaultQueryParser::new(self.query_handler.clone(), session)), - } + }; + PostgresServerHandler(Arc::new(handler)) } } diff --git a/src/servers/src/postgres/auth_handler.rs b/src/servers/src/postgres/auth_handler.rs index 83d508215d72..3f3360385840 100644 --- a/src/servers/src/postgres/auth_handler.rs +++ b/src/servers/src/postgres/auth_handler.rs @@ -29,7 +29,7 @@ use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage}; use session::Session; use snafu::IntoError; -use super::PostgresServerHandler; +use super::PostgresServerHandlerInner; use crate::error::{AuthSnafu, Result}; use crate::metrics::METRIC_AUTH_FAILURE; use crate::postgres::types::PgErrorCode; @@ -127,7 +127,7 @@ where } #[async_trait] -impl StartupHandler for PostgresServerHandler { +impl StartupHandler for PostgresServerHandlerInner { async fn on_startup( &self, client: &mut C, diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index e10a45ddecec..158e2cab4da9 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -39,13 +39,13 @@ use sql::dialect::PostgreSqlDialect; use sql::parser::{ParseOptions, ParserContext}; use super::types::*; -use super::{fixtures, PostgresServerHandler}; +use super::{fixtures, PostgresServerHandlerInner}; use crate::error::Result; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::SqlPlan; #[async_trait] -impl SimpleQueryHandler for PostgresServerHandler { +impl SimpleQueryHandler for PostgresServerHandlerInner { #[tracing::instrument(skip_all, fields(protocol = "postgres"))] async fn do_query<'a, C>( &self, @@ -237,7 +237,7 @@ impl QueryParser for DefaultQueryParser { } #[async_trait] -impl ExtendedQueryHandler for PostgresServerHandler { +impl ExtendedQueryHandler for PostgresServerHandlerInner { type Statement = SqlPlan; type QueryParser = DefaultQueryParser; diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index cca9c43181cc..e904845547d6 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -94,14 +94,8 @@ impl PostgresServer { let _handle = io_runtime.spawn(async move { crate::metrics::METRIC_POSTGRES_CONNECTIONS.inc(); let pg_handler = Arc::new(handler_maker.make(addr)); - let r = process_socket( - io_stream, - tls_acceptor.clone(), - pg_handler.clone(), - pg_handler.clone(), - pg_handler, - ) - .await; + let r = + process_socket(io_stream, tls_acceptor.clone(), pg_handler).await; crate::metrics::METRIC_POSTGRES_CONNECTIONS.dec(); r }); diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 85b626cb1ac6..2e4a805ef0bc 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -20,13 +20,14 @@ mod interval; use std::collections::HashMap; use std::ops::Deref; -use chrono::{NaiveDate, NaiveDateTime}; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use common_time::Interval; use datafusion_common::ScalarValue; use datafusion_expr::LogicalPlan; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::Schema; use datatypes::types::TimestampType; +use datatypes::value::ListValue; use pgwire::api::portal::{Format, Portal}; use pgwire::api::results::{DataRowEncoder, FieldInfo}; use pgwire::api::Type; @@ -58,6 +59,317 @@ pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result>>() } +fn encode_array( + query_ctx: &QueryContextRef, + value_list: &ListValue, + builder: &mut DataRowEncoder, +) -> PgWireResult<()> { + match value_list.datatype() { + &ConcreteDataType::Boolean(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Boolean(v) => Ok(Some(*v)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected bool",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Int8(v) => Ok(Some(*v)), + Value::UInt8(v) => Ok(Some(*v as i8)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected int8 or uint8", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Int16(v) => Ok(Some(*v)), + Value::UInt16(v) => Ok(Some(*v as i16)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected int16 or uint16", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Int32(v) => Ok(Some(*v)), + Value::UInt32(v) => Ok(Some(*v as i32)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected int32 or uint32", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Int64(v) => Ok(Some(*v)), + Value::UInt64(v) => Ok(Some(*v as i64)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected int64 or uint64", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Float32(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Float32(v) => Ok(Some(v.0)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected float32",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Float64(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Float64(v) => Ok(Some(v.0)), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected float64",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Binary(_) => { + let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output(); + + match *bytea_output { + PGByteaOutputValue::ESCAPE => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))), + + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected binary", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + PGByteaOutputValue::HEX => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))), + + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "Invalid list item type, find {v:?}, expected binary", + ), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + } + } + &ConcreteDataType::String(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::String(v) => Ok(Some(v.as_utf8())), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected string",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Date(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Date(v) => { + if let Some(date) = v.to_chrono_date() { + let (style, order) = + *query_ctx.configuration_parameter().pg_datetime_style(); + Ok(Some(StylingDate(date, style, order))) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected date",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::DateTime(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::DateTime(v) => { + if let Some(datetime) = + v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone())) + { + let (style, order) = + *query_ctx.configuration_parameter().pg_datetime_style(); + Ok(Some(StylingDateTime(datetime, style, order))) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected date",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Timestamp(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Timestamp(v) => { + if let Some(datetime) = + v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone())) + { + let (style, order) = + *query_ctx.configuration_parameter().pg_datetime_style(); + Ok(Some(StylingDateTime(datetime, style, order))) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Time(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Time(v) => Ok(v.to_chrono_time()), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected time",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Interval(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Interval(v) => Ok(Some(PgInterval::from(*v))), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected interval",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Decimal128(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Decimal128(v) => Ok(Some(v.to_string())), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected decimal",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + &ConcreteDataType::Json(_) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Binary(v) => Ok(Some(jsonb::to_string(v))), + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected json",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "cannot write array type {:?} in postgres protocol: unimplemented", + value_list.datatype() + ), + }))), + } +} + pub(super) fn encode_value( query_ctx: &QueryContextRef, value: &Value, @@ -93,7 +405,7 @@ pub(super) fn encode_value( Value::Date(v) => { if let Some(date) = v.to_chrono_date() { let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style(); - builder.encode_field(&StylingDate(&date, style, order)) + builder.encode_field(&StylingDate(date, style, order)) } else { Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!("Failed to convert date to postgres type {v:?}",), @@ -101,9 +413,10 @@ pub(super) fn encode_value( } } Value::DateTime(v) => { - if let Some(datetime) = v.to_chrono_datetime() { + if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone())) + { let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style(); - builder.encode_field(&StylingDateTime(&datetime, style, order)) + builder.encode_field(&StylingDateTime(datetime, style, order)) } else { Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!("Failed to convert date to postgres type {v:?}",), @@ -111,9 +424,10 @@ pub(super) fn encode_value( } } Value::Timestamp(v) => { - if let Some(datetime) = v.to_chrono_datetime() { + if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone())) + { let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style(); - builder.encode_field(&StylingDateTime(&datetime, style, order)) + builder.encode_field(&StylingDateTime(datetime, style, order)) } else { Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!("Failed to convert date to postgres type {v:?}",), @@ -131,14 +445,13 @@ pub(super) fn encode_value( } Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)), Value::Decimal128(v) => builder.encode_field(&v.to_string()), - Value::List(_) | Value::Duration(_) => { - Err(PgWireError::ApiError(Box::new(Error::Internal { - err_msg: format!( - "cannot write value {:?} in postgres protocol: unimplemented", - &value - ), - }))) - } + Value::List(values) => encode_array(query_ctx, values, builder), + Value::Duration(_) => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "cannot write value {:?} in postgres protocol: unimplemented", + &value + ), + }))), } } @@ -155,19 +468,45 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Binary(_) => Ok(Type::BYTEA), &ConcreteDataType::String(_) => Ok(Type::VARCHAR), &ConcreteDataType::Date(_) => Ok(Type::DATE), - &ConcreteDataType::DateTime(_) => Ok(Type::TIMESTAMP), - &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), + &ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), &ConcreteDataType::Time(_) => Ok(Type::TIME), &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL), &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC), &ConcreteDataType::Json(_) => Ok(Type::JSON), - &ConcreteDataType::Duration(_) - | &ConcreteDataType::List(_) - | &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu { - data_type: origin, - reason: "not implemented", + ConcreteDataType::List(list) => match list.item_type() { + &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN), + &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY), + &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY), + &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY), + &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY), + &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY), + &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY), + &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY), + &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY), + &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY), + &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY), + &ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => { + Ok(Type::TIMESTAMP_ARRAY) + } + &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY), + &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY), + &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY), + &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY), + &ConcreteDataType::Duration(_) + | &ConcreteDataType::Dictionary(_) + | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu { + data_type: origin, + reason: "not implemented", + } + .fail(), + }, + &ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) => { + server_error::UnsupportedDataTypeSnafu { + data_type: origin, + reason: "not implemented", + } + .fail() } - .fail(), } } @@ -621,6 +960,7 @@ mod test { use common_time::interval::IntervalUnit; use common_time::timestamp::TimeUnit; + use common_time::Timestamp; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::value::ListValue; use pgwire::api::results::{FieldFormat, FieldInfo}; @@ -816,6 +1156,34 @@ mod test { Type::INTERVAL, FieldFormat::Text, ), + FieldInfo::new( + "int_list".into(), + None, + None, + Type::INT8_ARRAY, + FieldFormat::Text, + ), + FieldInfo::new( + "float_list".into(), + None, + None, + Type::FLOAT8_ARRAY, + FieldFormat::Text, + ), + FieldInfo::new( + "string_list".into(), + None, + None, + Type::VARCHAR_ARRAY, + FieldFormat::Text, + ), + FieldInfo::new( + "timestamp_list".into(), + None, + None, + Type::TIMESTAMP_ARRAY, + FieldFormat::Text, + ), ]; let datatypes = vec![ @@ -846,6 +1214,10 @@ mod test { ConcreteDataType::datetime_datatype(), ConcreteDataType::timestamp_datatype(TimeUnit::Second), ConcreteDataType::interval_datatype(IntervalUnit::YearMonth), + ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()), + ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()), + ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), + ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()), ]; let values = vec![ Value::Null, @@ -875,6 +1247,22 @@ mod test { Value::DateTime(1000001i64.into()), Value::Timestamp(1000001i64.into()), Value::Interval(1000001i128.into()), + Value::List(ListValue::new( + vec![Value::Int64(1i64)], + ConcreteDataType::int64_datatype(), + )), + Value::List(ListValue::new( + vec![Value::Float64(1.0f64.into())], + ConcreteDataType::float64_datatype(), + )), + Value::List(ListValue::new( + vec![Value::String("tom".into())], + ConcreteDataType::string_datatype(), + )), + Value::List(ListValue::new( + vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))], + ConcreteDataType::timestamp_second_datatype(), + )), ]; let query_context = QueryContextBuilder::default() .configuration_parameter(Default::default()) @@ -884,22 +1272,6 @@ mod test { for (value, datatype) in values.iter().zip(datatypes) { encode_value(&query_context, value, &mut builder, &datatype).unwrap(); } - - let err = encode_value( - &query_context, - &Value::List(ListValue::new(vec![], ConcreteDataType::int16_datatype())), - &mut builder, - &ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()), - ) - .unwrap_err(); - match err { - PgWireError::ApiError(e) => { - assert!(format!("{e}").contains("Internal error:")); - } - _ => { - unreachable!() - } - } } #[test] diff --git a/src/servers/src/postgres/types/datetime.rs b/src/servers/src/postgres/types/datetime.rs index fc324c047b70..700f6dc2b53c 100644 --- a/src/servers/src/postgres/types/datetime.rs +++ b/src/servers/src/postgres/types/datetime.rs @@ -19,10 +19,10 @@ use postgres_types::{IsNull, ToSql, Type}; use session::session_config::{PGDateOrder, PGDateTimeStyle}; #[derive(Debug)] -pub struct StylingDate<'a>(pub &'a NaiveDate, pub PGDateTimeStyle, pub PGDateOrder); +pub struct StylingDate(pub NaiveDate, pub PGDateTimeStyle, pub PGDateOrder); #[derive(Debug)] -pub struct StylingDateTime<'a>(pub &'a NaiveDateTime, pub PGDateTimeStyle, pub PGDateOrder); +pub struct StylingDateTime(pub NaiveDateTime, pub PGDateTimeStyle, pub PGDateOrder); fn date_format_string(style: PGDateTimeStyle, order: PGDateOrder) -> &'static str { match style { @@ -53,7 +53,7 @@ fn datetime_format_string(style: PGDateTimeStyle, order: PGDateOrder) -> &'stati }, } } -impl ToSqlText for StylingDate<'_> { +impl ToSqlText for StylingDate { fn to_sql_text( &self, ty: &Type, @@ -78,7 +78,7 @@ impl ToSqlText for StylingDate<'_> { } } -impl ToSqlText for StylingDateTime<'_> { +impl ToSqlText for StylingDateTime { fn to_sql_text( &self, ty: &Type, @@ -112,7 +112,7 @@ impl ToSqlText for StylingDateTime<'_> { macro_rules! delegate_to_sql { ($delegator:ident, $delegatee:ident) => { - impl ToSql for $delegator<'_> { + impl ToSql for $delegator { fn to_sql( &self, ty: &Type, @@ -148,7 +148,7 @@ mod tests { let naive_date = NaiveDate::from_ymd_opt(1997, 12, 17).unwrap(); { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::MDY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::MDY); let expected = "1997-12-17"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -157,7 +157,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::YMD); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::YMD); let expected = "1997-12-17"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -166,7 +166,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::DMY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::DMY); let expected = "1997-12-17"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -175,7 +175,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::MDY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::MDY); let expected = "17.12.1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -184,7 +184,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::YMD); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::YMD); let expected = "17.12.1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -193,7 +193,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::DMY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::DMY); let expected = "17.12.1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -202,8 +202,7 @@ mod tests { } { - let styling_date = - StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::MDY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::MDY); let expected = "12-17-1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -212,8 +211,7 @@ mod tests { } { - let styling_date = - StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::YMD); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::YMD); let expected = "12-17-1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -222,8 +220,7 @@ mod tests { } { - let styling_date = - StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::DMY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::DMY); let expected = "17-12-1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -232,7 +229,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::MDY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::MDY); let expected = "12/17/1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -241,7 +238,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::YMD); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::YMD); let expected = "12/17/1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -250,7 +247,7 @@ mod tests { } { - let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::DMY); + let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::DMY); let expected = "17/12/1997"; let mut out = bytes::BytesMut::new(); let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap(); @@ -266,7 +263,7 @@ mod tests { .unwrap(); { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::MDY); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::MDY); let expected = "2021-09-01 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -277,7 +274,7 @@ mod tests { } { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::YMD); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::YMD); let expected = "2021-09-01 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -288,7 +285,7 @@ mod tests { } { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::DMY); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::DMY); let expected = "2021-09-01 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -300,7 +297,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::MDY); + StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::MDY); let expected = "01.09.2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -312,7 +309,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::YMD); + StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::YMD); let expected = "01.09.2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -324,7 +321,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::DMY); + StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::DMY); let expected = "01.09.2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -336,7 +333,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::MDY); + StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::MDY); let expected = "Wed Sep 01 12:34:56.789012 2021"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -348,7 +345,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::YMD); + StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::YMD); let expected = "Wed Sep 01 12:34:56.789012 2021"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -360,7 +357,7 @@ mod tests { { let styling_datetime = - StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::DMY); + StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::DMY); let expected = "Wed 01 Sep 12:34:56.789012 2021"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -371,7 +368,7 @@ mod tests { } { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::MDY); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::MDY); let expected = "09/01/2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -382,7 +379,7 @@ mod tests { } { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::YMD); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::YMD); let expected = "09/01/2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime @@ -393,7 +390,7 @@ mod tests { } { - let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::DMY); + let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::DMY); let expected = "01/09/2021 12:34:56.789012"; let mut out = bytes::BytesMut::new(); let is_null = styling_datetime diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 1e87c54e5f31..19acc37ea6c6 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -69,6 +69,7 @@ macro_rules! sql_tests { test_postgres_bytea, test_postgres_datestyle, test_postgres_parameter_inference, + test_postgres_array_types, test_mysql_prepare_stmt_insert_timestamp, ); )* @@ -1111,3 +1112,34 @@ pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) { let _ = server.shutdown().await; guard.remove_all().await; } + +pub async fn test_postgres_array_types(store_type: StorageType) { + let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_inference").await; + + let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls) + .await + .unwrap(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + connection.await.unwrap(); + tx.send(()).unwrap(); + }); + + let rows = client + .query( + "SELECT arrow_cast(1, 'List(Int8)'), arrow_cast('tom', 'List(Utf8)'), arrow_cast(3.14, 'List(Float32)'), arrow_cast('2023-01-02T12:53:02', 'List(Timestamp(Millisecond, None))')", + &[], + ) + .await + .unwrap(); + + assert_eq!(1, rows.len()); + + // Shutdown the client. + drop(client); + rx.await.unwrap(); + + let _ = fe_pg_server.shutdown().await; + guard.remove_all().await; +} diff --git a/tests/cases/standalone/common/function/geo.result b/tests/cases/standalone/common/function/geo.result index 3a63b5890b59..895444765032 100644 --- a/tests/cases/standalone/common/function/geo.result +++ b/tests/cases/standalone/common/function/geo.result @@ -158,11 +158,11 @@ SELECT geohash(37.76938, -122.3889, 11); SELECT geohash(37.76938, -122.3889, 100); -Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 100. Accepted values are between 1 and 12, inclusive +Error: 3001(EngineExecuteQuery), Invalid geohash resolution 100, expect value: [1, 12] SELECT geohash(37.76938, -122.3889, -1); -Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 18446744073709551615. Accepted values are between 1 and 12, inclusive +Error: 3001(EngineExecuteQuery), Invalid geohash resolution -1, expect value: [1, 12] SELECT geohash(37.76938, -122.3889, 11::Int8); @@ -228,3 +228,11 @@ SELECT geohash(37.76938, -122.3889, 11::UInt64); | 9q8yygxneft | +------------------------------------------------------------------------------------+ +SELECT geohash_neighbours(37.76938, -122.3889, 11); + ++----------------------------------------------------------------------------------------------------------+ +| geohash_neighbours(Float64(37.76938),Float64(-122.3889),Int64(11)) | ++----------------------------------------------------------------------------------------------------------+ +| [9q8yygxnefv, 9q8yygxnefu, 9q8yygxnefs, 9q8yygxnefk, 9q8yygxnefm, 9q8yygxnefq, 9q8yygxnefw, 9q8yygxnefy] | ++----------------------------------------------------------------------------------------------------------+ + diff --git a/tests/cases/standalone/common/function/geo.sql b/tests/cases/standalone/common/function/geo.sql index 3a0e668acc06..be2b3947bbc8 100644 --- a/tests/cases/standalone/common/function/geo.sql +++ b/tests/cases/standalone/common/function/geo.sql @@ -64,3 +64,5 @@ SELECT geohash(37.76938, -122.3889, 11::UInt16); SELECT geohash(37.76938, -122.3889, 11::UInt32); SELECT geohash(37.76938, -122.3889, 11::UInt64); + +SELECT geohash_neighbours(37.76938, -122.3889, 11);