Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable normalization for configuration options and preserve case for S3 paths #13576

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,13 @@ mod tests {

#[tokio::test]
async fn s3_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
// "fake" is uppercase to ensure the values are not lowercased when parsed
let access_key_id = "FAKE_access_key_id";
let secret_access_key = "FAKE_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";
let session_token = "FAKE_session_token";
let location = "s3://bucket/path/FAKE/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
libc = "0.2.140"
log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
Expand Down
80 changes: 56 additions & 24 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;

Expand All @@ -29,7 +30,9 @@ use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
/// in the [`ConfigOptions`] configuration tree
/// in the [`ConfigOptions`] configuration tree.
///
/// `transform` is used to normalize values before parsing.
///
/// For example,
///
Expand All @@ -38,7 +41,7 @@ use crate::{DataFusionError, Result};
/// /// Amazing config
/// pub struct MyConfig {
/// /// Field 1 doc
/// field1: String, default = "".to_string()
/// field1: String, transform = str::to_lowercase, default = "".to_string()
///
/// /// Field 2 doc
/// field2: usize, default = 232
Expand Down Expand Up @@ -67,9 +70,12 @@ use crate::{DataFusionError, Result};
/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
/// match key {
/// "field1" => self.field1.set(rem, value),
/// "field2" => self.field2.set(rem, value),
/// "field3" => self.field3.set(rem, value),
/// "field1" => {
/// let value = str::to_lowercase(value);
/// self.field1.set(rem, value.as_ref())
/// },
/// "field2" => self.field2.set(rem, value.as_ref()),
/// "field3" => self.field3.set(rem, value.as_ref()),
/// _ => _internal_err!(
/// "Config value \"{}\" not found on MyConfig",
/// key
Expand Down Expand Up @@ -102,15 +108,14 @@ use crate::{DataFusionError, Result};
/// ```
///
/// NB: Misplaced commas may result in nonsensical errors
///
#[macro_export]
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
$field_vis:vis $field_name:ident : $field_type:ty, $(warn = $warn: expr,)? $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
Expand All @@ -127,9 +132,14 @@ macro_rules! config_namespace {
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));

match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
stringify!($field_name) => {
$(let value = $transform(value);)?
$(log::warn!($warn);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => return _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
Expand Down Expand Up @@ -211,12 +221,15 @@ config_namespace! {
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true
/// When set to true, SQL parser will normalize options value (convert value to lowercase).
/// Note that this option is ignored and will be removed in the future. All case-insensitive values
/// are normalized automatically.
pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false

/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
// no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive

/// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
/// ignore the length. If false, error if a `VARCHAR` with a length is
Expand Down Expand Up @@ -431,7 +444,7 @@ config_namespace! {
///
/// Note that this default setting is not the same as
/// the default parquet writer setting.
pub compression: Option<String>, default = Some("zstd(3)".into())
pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())

/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
Expand All @@ -444,7 +457,7 @@ config_namespace! {
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive. If NULL, uses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the These values are not case sensitive can be removed

/// default parquet writer setting
pub statistics_enabled: Option<String>, default = Some("page".into())
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
Expand All @@ -470,7 +483,7 @@ config_namespace! {
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive. If NULL, uses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values are not case sensitive.

this suggests the code reading this value does normalization and that it's no longer needed, can be simplified

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this is a good info for users that you assign any values to it - "plain", "PLAIN", "Plain", etc. Also, I think that's what Oleks asked in #13576 (comment) - so maybe let's keep it as is?

/// default parquet writer setting
pub encoding: Option<String>, default = None
pub encoding: Option<String>, transform = str::to_lowercase, default = None
blaginin marked this conversation as resolved.
Show resolved Hide resolved

/// (writing) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
Expand Down Expand Up @@ -971,29 +984,45 @@ impl<F: ConfigField + Default> ConfigField for Option<F> {
}
}

fn default_transform<T>(input: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: Sync + Send + Error + 'static,
{
input.parse().map_err(|e| {
DataFusionError::Context(
format!(
"Error parsing '{}' as {}",
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}

#[macro_export]
macro_rules! config_field {
($t:ty) => {
config_field!($t, value => default_transform(value)?);
};

($t:ty, $arg:ident => $transform:expr) => {
impl ConfigField for $t {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = value.parse().map_err(|e| {
DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
*self = $transform;
Ok(())
}
}
};
}

config_field!(String);
config_field!(bool);
config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
config_field!(usize);
config_field!(f64);
config_field!(u64);
Expand Down Expand Up @@ -1508,7 +1537,7 @@ macro_rules! config_namespace_with_hashmap {
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
$field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
Expand All @@ -1527,7 +1556,10 @@ macro_rules! config_namespace_with_hashmap {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
stringify!($field_name) => {
$(let value = $transform(value);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
Expand Down Expand Up @@ -1606,7 +1638,7 @@ config_namespace_with_hashmap! {
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case-sensitive. If NULL, uses
/// default parquet options
pub compression: Option<String>, default = None
pub compression: Option<String>, transform = str::to_lowercase, default = None

/// Sets if statistics are enabled for the column
/// Valid values are: "none", "chunk", and "page"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TableProviderFactory for StreamTableFactory {
let header = if let Ok(opt) = cmd
.options
.get("format.has_header")
.map(|has_header| bool::from_str(has_header))
.map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
.transpose()
{
opt.unwrap_or(false)
Expand Down
17 changes: 13 additions & 4 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ use std::env;
fn from_env() {
// Note: these must be a single test to avoid interference from concurrent execution
let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS";
env::set_var(env_key, "true");
let config = ConfigOptions::from_env().unwrap();
// valid testing in different cases
for bool_option in ["true", "TRUE", "True", "tRUe"] {
env::set_var(env_key, bool_option);
let config = ConfigOptions::from_env().unwrap();
env::remove_var(env_key);
assert!(config.optimizer.filter_null_join_keys);
}

// invalid testing
env::set_var(env_key, "ttruee");
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing 'ttruee' as bool\ncaused by\nExternal error: provided string was not `true` or `false`");
env::remove_var(env_key);
assert!(config.optimizer.filter_null_join_keys);

let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

Expand All @@ -37,7 +46,7 @@ fn from_env() {
// for invalid testing
env::set_var(env_key, "abc");
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing abc as usize\ncaused by\nExternal error: invalid digit found in string");
assert_eq!(err, "Error parsing 'abc' as usize\ncaused by\nExternal error: invalid digit found in string");

env::remove_var(env_key);
let config = ConfigOptions::from_env().unwrap();
Expand Down
35 changes: 3 additions & 32 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use arrow_schema::*;
use datafusion_common::{
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
};
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
use sqlparser::ast::{TimezoneInfo, Value};

use datafusion_common::TableReference;
use datafusion_common::{
Expand All @@ -38,7 +38,7 @@ use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{col, Expr};

use crate::utils::{make_decimal_type, value_to_string};
use crate::utils::make_decimal_type;
pub use datafusion_expr::planner::ContextProvider;

/// SQL parser options
Expand All @@ -56,7 +56,7 @@ impl Default for ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
enable_options_value_normalization: true,
enable_options_value_normalization: false,
}
}
}
Expand Down Expand Up @@ -87,32 +87,6 @@ impl IdentNormalizer {
}
}

/// Value Normalizer
#[derive(Debug)]
pub struct ValueNormalizer {
normalize: bool,
}

impl Default for ValueNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}

impl ValueNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}

pub fn normalize(&self, value: Value) -> Option<String> {
match (value_to_string(&value), self.normalize) {
(Some(s), true) => Some(s.to_ascii_lowercase()),
(Some(s), false) => Some(s),
(None, _) => None,
}
}
}

/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
Expand Down Expand Up @@ -254,7 +228,6 @@ pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -266,13 +239,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Create a new query planner
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
}
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = self.value_normalizer.normalize(value.clone())
else {
let Some(value_string) = crate::utils::value_to_string(&value) else {
return plan_err!("Unsupported Value {}", value);
};

Expand Down
Loading
Loading