Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Count to functions-aggregate, update MSRV to rust 1.75 #10484

Merged
merged 81 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
755215d
mv accumulate indices
jayzhan211 May 10, 2024
12366e6
complete udaf
jayzhan211 May 10, 2024
9f43638
register
jayzhan211 May 11, 2024
7dd2aae
fix expr
jayzhan211 May 11, 2024
2fb0c2a
filter distinct count
jayzhan211 May 11, 2024
9dd044c
todo: need to move count distinct too
jayzhan211 May 12, 2024
ab9eaaa
move code around
jayzhan211 May 5, 2024
1506453
move distinct to aggr-crate
jayzhan211 May 12, 2024
6a554ee
replace
jayzhan211 May 12, 2024
dae3061
Merge branch 'count-udaf' into count-for-all
jayzhan211 May 12, 2024
a44c967
backup
jayzhan211 May 12, 2024
dd5a90f
fix function name and physical expr
jayzhan211 May 13, 2024
c9eebb3
fix physical optimizer
jayzhan211 May 13, 2024
6375d47
fix all slt
jayzhan211 May 13, 2024
37d8974
cleanup
jayzhan211 May 13, 2024
643ffe9
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 May 13, 2024
9917b37
cleanup
jayzhan211 May 13, 2024
c6ba32b
fix with args
jayzhan211 May 13, 2024
2d80a8b
add label
jayzhan211 May 13, 2024
2bcb10b
revert builtin related code back
jayzhan211 May 13, 2024
12f27c4
fix test
jayzhan211 May 13, 2024
a391407
fix substrait
jayzhan211 May 14, 2024
3ace4bf
fix doc
jayzhan211 May 14, 2024
dc8ddd7
fmy
jayzhan211 May 14, 2024
8278d7b
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 May 15, 2024
5765d99
fix
jayzhan211 May 15, 2024
ea81c6e
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 May 16, 2024
d55abb4
fix udaf macro for distinct but not apply
jayzhan211 May 16, 2024
2e051d1
fmt
jayzhan211 May 16, 2024
5a08c3a
fix count distinct and use workspace
jayzhan211 May 16, 2024
facbbc5
add reverse
jayzhan211 May 16, 2024
c32b2ef
remove old code
jayzhan211 May 16, 2024
93d05e5
backup
jayzhan211 May 17, 2024
eb664fb
use macro
jayzhan211 May 17, 2024
14ef6dc
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 May 17, 2024
406f432
Merge branch 'expr-builder' into count-for-all
jayzhan211 May 17, 2024
5e5d17f
expr builder
jayzhan211 May 17, 2024
fbb87c6
introduce expr builder
jayzhan211 May 17, 2024
800f006
add example
jayzhan211 May 18, 2024
dee9417
fmt
jayzhan211 May 18, 2024
c453e40
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 May 18, 2024
1b92b2b
clean agg sta
jayzhan211 May 18, 2024
330b324
combine agg
jayzhan211 May 18, 2024
ef06589
Merge branch 'expr-builder' into count-for-all
jayzhan211 May 18, 2024
92accf9
limit distinct and fmt
jayzhan211 May 18, 2024
10d92e1
cleanup name
jayzhan211 May 18, 2024
7d541ef
fix ci
jayzhan211 May 18, 2024
f09a982
fix window
jayzhan211 May 18, 2024
bcc15ee
fmt
jayzhan211 May 18, 2024
07824a1
fix ci
jayzhan211 May 18, 2024
cd40d7f
fmt
jayzhan211 May 18, 2024
1821d18
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 3, 2024
d652322
fix merged
jayzhan211 Jun 3, 2024
38e0243
fix
jayzhan211 Jun 3, 2024
b469a25
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 3, 2024
1735bf0
fix rebase
jayzhan211 Jun 3, 2024
de8071d
cleanup
jayzhan211 Jun 3, 2024
01edaf3
use std
jayzhan211 Jun 3, 2024
4d1c7f7
update mrsv
jayzhan211 Jun 3, 2024
c98e925
upd msrv
jayzhan211 Jun 3, 2024
cf8ba04
revert test
jayzhan211 Jun 4, 2024
ecc7364
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 4, 2024
b823aad
fmt
jayzhan211 Jun 4, 2024
2d8cfc0
downgrade to 1.75
jayzhan211 Jun 4, 2024
44e0328
1.76
jayzhan211 Jun 5, 2024
01a7325
ahas
jayzhan211 Jun 5, 2024
ad518a7
revert to 1.75
jayzhan211 Jun 5, 2024
fec064a
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 5, 2024
439577a
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 5, 2024
ea864e0
rm count
jayzhan211 Jun 5, 2024
d621fb6
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 6, 2024
b3f8f8c
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 6, 2024
121e4cb
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 9, 2024
d89424d
fix merge
jayzhan211 Jun 9, 2024
a88916d
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 10, 2024
0924248
fmt
jayzhan211 Jun 10, 2024
33a02f9
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 12, 2024
4b13fc0
clippy
jayzhan211 Jun 12, 2024
4bd680d
rm sum in test_no_duplicate_name
jayzhan211 Jun 12, 2024
cf6f9ac
Merge remote-tracking branch 'upstream/main' into count-for-all
jayzhan211 Jun 12, 2024
97a1227
fix
jayzhan211 Jun 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.73"
rust-version = "1.75"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By my reading of https://github.com/apache/datafusion?tab=readme-ov-file#rust-version-compatibility-policy

is that since 1.74 was released in Nov 2023, we can safely update in this PR

version = "39.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -107,7 +107,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.73"
rust-version = "1.75"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.73"
rust-version = "1.75"

[lints]
workspace = true
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ use datafusion_common::{
};
use datafusion_expr::lit;
use datafusion_expr::{
avg, count, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
avg, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_functions_aggregate::expr_fn::{median, stddev};
use datafusion_functions_aggregate::expr_fn::{count, median, stddev, sum};

use async_trait::async_trait;

Expand Down Expand Up @@ -854,10 +853,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(
vec![],
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1594,9 +1590,10 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, count_distinct, create_udf, expr, lit, BuiltInWindowFunction,
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::count_distinct;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

Expand Down
79 changes: 27 additions & 52 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,38 +170,6 @@ fn take_optimizable_column_and_table_count(
}
}
}
// TODO: Remove this after revmoing Builtin Count
else if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
) {
// TODO implementing Eq on PhysicalExpr would help a lot here
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
let current_val = &col_stats[col_expr.index()].null_count;
if let &Precision::Exact(val) = current_val {
return Some((
ScalarValue::Int64(Some((num_rows - val) as i64)),
casted_expr.name().to_string(),
));
}
} else if let Some(lit_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
casted_expr.name().to_owned(),
));
}
}
}
}
None
}

Expand Down Expand Up @@ -307,13 +275,12 @@ fn take_optimizable_max(

#[cfg(test)]
pub(crate) mod tests {

use super::*;

use crate::logical_expr::Operator;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::expressions::Count;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::memory::MemoryExec;
use crate::prelude::SessionContext;
Expand All @@ -322,8 +289,10 @@ pub(crate) mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int64_array;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::cast;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
Expand Down Expand Up @@ -414,13 +383,19 @@ pub(crate) mod tests {
Self::ColumnA(schema.clone())
}

/// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self) -> Arc<dyn AggregateExpr> {
Arc::new(Count::new(
self.column(),
// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[self.column()],
&[],
&[],
schema,
self.column_name(),
DataType::Int64,
))
false,
false,
)
.unwrap()
}

/// what argument would this aggregate need in the plan?
Expand Down Expand Up @@ -458,7 +433,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -467,7 +442,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -488,7 +463,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -497,7 +472,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -517,7 +492,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -529,7 +504,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand All @@ -549,7 +524,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -561,7 +536,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand Down Expand Up @@ -592,7 +567,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -601,7 +576,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down Expand Up @@ -637,7 +612,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -646,7 +621,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down
47 changes: 26 additions & 21 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ mod tests {
use crate::physical_plan::{displayable, Partitioning};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::{col, Count};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::udaf::create_aggregate_expr;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
Expand Down Expand Up @@ -303,15 +304,31 @@ mod tests {
)
}

// Return appropriate expr depending if COUNT is for col or table (*)
fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[expr],
&[],
&[],
schema,
name,
false,
false,
)
.unwrap()
}

#[test]
fn aggregations_not_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
repartition_exec(partial_aggregate_exec(
parquet_exec(&schema),
Expand All @@ -330,16 +347,8 @@ mod tests {
];
assert_optimized!(expected, plan);

let aggr_expr1 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr2 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(2)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand All @@ -365,11 +374,7 @@ mod tests {
#[test]
fn aggregations_combined() -> Result<()> {
let schema = schema();
let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ mod tests {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down Expand Up @@ -554,10 +554,10 @@ mod tests {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down
Loading