Skip to content

Commit

Permalink
Merge branch 'main' into add-boolean-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanc-n authored Nov 11, 2024
2 parents 684a905 + be19afc commit 4f374a4
Show file tree
Hide file tree
Showing 31 changed files with 712 additions and 99 deletions.
29 changes: 29 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext {
ctx
}

/// Target of this benchmark: control that placeholders replacing does not get slower,
/// if the query does not contain placeholders at all.
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {
const COLUMNS_NUM: usize = 200;
let mut aggregates = String::new();
for i in 0..COLUMNS_NUM {
if i > 0 {
aggregates.push_str(", ");
}
aggregates.push_str(format!("MAX(a{})", i).as_str());
}
// SELECT max(attr0), ..., max(attrN) FROM t1.
let query = format!("SELECT {} FROM t1", aggregates);
let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
let rt = Runtime::new().unwrap();
let plan =
rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() });
b.iter(|| {
let plan = plan.clone();
criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap());
});
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) {
}
})
});

c.bench_function("with_param_values_many_columns", |b| {
benchmark_with_param_values_many_columns(&ctx, b);
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ impl SessionContext {
LogicalPlan::Statement(Statement::Execute(execute)) => {
self.execute_prepared(execute)
}
LogicalPlan::Statement(Statement::Deallocate(deallocate)) => {
self.state
.write()
.remove_prepared(deallocate.name.as_str())?;
self.return_empty_dataframe()
}
plan => Ok(DataFrame::new(self.state(), plan)),
}
}
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,17 @@ impl SessionState {
pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> {
self.prepared_plans.get(name).map(Arc::clone)
}

/// Remove the prepared plan with the given name.
pub(crate) fn remove_prepared(
&mut self,
name: &str,
) -> datafusion_common::Result<()> {
match self.prepared_plans.remove(name) {
Some(_) => Ok(()),
None => exec_err!("Prepared statement '{}' does not exist", name),
}
}
}

/// A builder to be used for building [`SessionState`]'s. Defaults will
Expand Down
20 changes: 19 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_common::HashMap;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use rand::rngs::StdRng;
use rand::{thread_rng, Rng, SeedableRng};
use std::str;
use tokio::task::JoinSet;

// ========================================================================
Expand Down Expand Up @@ -171,6 +172,21 @@ fn baseline_config() -> DatasetGeneratorConfig {
ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)),
ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)),
ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)),
ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)),
ColumnDescr::new(
"timestamp_ms",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
ColumnDescr::new(
"timestamp_us",
DataType::Timestamp(TimeUnit::Microsecond, None),
),
ColumnDescr::new(
"timestamp_ns",
DataType::Timestamp(TimeUnit::Nanosecond, None),
),
ColumnDescr::new("float32", DataType::Float32),
ColumnDescr::new("float64", DataType::Float64),
ColumnDescr::new(
"interval_year_month",
DataType::Interval(IntervalUnit::YearMonth),
Expand Down Expand Up @@ -206,11 +222,13 @@ fn baseline_config() -> DatasetGeneratorConfig {
ColumnDescr::new("utf8", DataType::Utf8),
ColumnDescr::new("largeutf8", DataType::LargeUtf8),
ColumnDescr::new("utf8view", DataType::Utf8View),
// todo binary
// low cardinality columns
ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10),
ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10),
ColumnDescr::new("bool", DataType::Boolean),
ColumnDescr::new("binary", DataType::Binary),
ColumnDescr::new("large_binary", DataType::LargeBinary),
ColumnDescr::new("binaryview", DataType::BinaryView),
];

let min_num_rows = 512;
Expand Down
124 changes: 115 additions & 9 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
use std::sync::Arc;

use arrow::datatypes::{
BooleanType, ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type,
Decimal256Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, LargeUtf8Type,
StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
BinaryType, BinaryViewType, BooleanType, ByteArrayType, ByteViewType, Date32Type, Date64Type,
Decimal128Type, Decimal256Type, Float32Type, Float64Type, Int16Type, Int32Type,
Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType,
IntervalYearMonthType, LargeBinaryType, LargeUtf8Type, StringViewType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
Expand All @@ -36,7 +38,8 @@ use rand::{
};
use test_utils::{
array_gen::{
BooleanArrayGenerator, DecimalArrayGenerator, PrimitiveArrayGenerator,
BooleanArrayGenerator,
BinaryArrayGenerator, DecimalArrayGenerator, PrimitiveArrayGenerator,
StringArrayGenerator,
},
stagger_batch,
Expand Down Expand Up @@ -74,17 +77,19 @@ pub struct DatasetGeneratorConfig {
}

impl DatasetGeneratorConfig {
/// return a list of all column names
/// Return a list of all column names
pub fn all_columns(&self) -> Vec<&str> {
self.columns.iter().map(|d| d.name.as_str()).collect()
}

/// return a list of column names that are "numeric"
/// Return a list of column names that are "numeric"
pub fn numeric_columns(&self) -> Vec<&str> {
self.columns
.iter()
.filter_map(|d| {
if d.column_type.is_numeric() {
if d.column_type.is_numeric()
&& !matches!(d.column_type, DataType::Float32 | DataType::Float64)
{
Some(d.name.as_str())
} else {
None
Expand Down Expand Up @@ -301,6 +306,37 @@ macro_rules! generate_primitive_array {
}};
}

macro_rules! generate_binary_array {
(
$SELF:ident,
$NUM_ROWS:ident,
$MAX_NUM_DISTINCT:expr,
$BATCH_GEN_RNG:ident,
$ARRAY_GEN_RNG:ident,
$ARROW_TYPE:ident
) => {{
let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len());
let null_pct = $SELF.candidate_null_pcts[null_pct_idx];

let max_len = $BATCH_GEN_RNG.gen_range(1..100);

let mut generator = BinaryArrayGenerator {
max_len,
num_binaries: $NUM_ROWS,
num_distinct_binaries: $MAX_NUM_DISTINCT,
null_pct,
rng: $ARRAY_GEN_RNG,
};

match $ARROW_TYPE::DATA_TYPE {
DataType::Binary => generator.gen_data::<i32>(),
DataType::LargeBinary => generator.gen_data::<i64>(),
DataType::BinaryView => generator.gen_binary_view(),
_ => unreachable!(),
}
}};
}

impl RecordBatchGenerator {
fn new(min_rows_nun: usize, max_rows_num: usize, columns: Vec<ColumnDescr>) -> Self {
let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5];
Expand Down Expand Up @@ -550,6 +586,76 @@ impl RecordBatchGenerator {
IntervalMonthDayNanoType
)
}
DataType::Timestamp(TimeUnit::Second, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampSecondType
)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampMillisecondType
)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampMicrosecondType
)
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampNanosecondType
)
}
DataType::Binary => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
BinaryType
)
}
DataType::LargeBinary => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
LargeBinaryType
)
}
DataType::BinaryView => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
BinaryViewType
)
}
DataType::Decimal128(precision, scale) => {
generate_decimal_array!(
self,
Expand Down
Loading

0 comments on commit 4f374a4

Please sign in to comment.