Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/datafusion into type-class
Browse files Browse the repository at this point in the history
  • Loading branch information
jayzhan211 committed Dec 12, 2024
2 parents 4a7404d + 08119e6 commit 7400429
Show file tree
Hide file tree
Showing 39 changed files with 1,760 additions and 1,265 deletions.
10 changes: 5 additions & 5 deletions datafusion-examples/examples/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ async fn query_parquet_demo() -> Result<()> {

assert_batches_eq!(
&[
"+------------+----------------------+",
"| double_col | sum(?table?.int_col) |",
"+------------+----------------------+",
"| 10.1 | 4 |",
"+------------+----------------------+",
"+------------+-------------+",
"| double_col | sum_int_col |",
"+------------+-------------+",
"| 10.1 | 4 |",
"+------------+-------------+",
],
&result
);
Expand Down
14 changes: 13 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@ use datafusion_expr::{
};
use datafusion_physical_plan::ExecutionPlan;

/// Source table
/// A named table which can be queried.
///
/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
///
/// [`TableProvider`] represents a source of data which can provide data as
/// Apache Arrow `RecordBatch`es. Implementations of this trait provide
/// important information for planning such as:
///
/// 1. [`Self::schema`]: The schema (columns and their types) of the table
/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
///
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait TableProvider: Debug + Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
Expand Down
21 changes: 16 additions & 5 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, Sq
use itertools::Itertools;
use log::{debug, info};
use object_store::ObjectStore;
use sqlparser::ast::Expr as SQLExpr;
use sqlparser::ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias};
use sqlparser::dialect::dialect_from_str;
use std::any::Any;
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -500,11 +500,22 @@ impl SessionState {
sql: &str,
dialect: &str,
) -> datafusion_common::Result<SQLExpr> {
self.sql_to_expr_with_alias(sql, dialect).map(|x| x.expr)
}

/// parse a sql string into a sqlparser-rs AST [`SQLExprWithAlias`].
///
/// See [`Self::create_logical_expr`] for parsing sql to [`Expr`].
pub fn sql_to_expr_with_alias(
&self,
sql: &str,
dialect: &str,
) -> datafusion_common::Result<SQLExprWithAlias> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

Expand Down Expand Up @@ -603,15 +614,15 @@ impl SessionState {
) -> datafusion_common::Result<Expr> {
let dialect = self.config.options().sql_parser.dialect.as_str();

let sql_expr = self.sql_to_expr(sql, dialect)?;
let sql_expr = self.sql_to_expr_with_alias(sql, dialect)?;

let provider = SessionContextProvider {
state: self,
tables: HashMap::new(),
};

let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new())
query.sql_to_expr_with_alias(sql_expr, df_schema, &mut PlannerContext::new())
}

/// Returns the [`Analyzer`] for this session
Expand Down
101 changes: 101 additions & 0 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::tree_node::PlanContext;
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow_schema::SchemaRef;

use datafusion_common::tree_node::{
ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion,
Expand All @@ -38,6 +39,8 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::HashJoinExec;

/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
Expand Down Expand Up @@ -294,6 +297,8 @@ fn pushdown_requirement_to_children(
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
}
} else if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
handle_hash_join(hash_join, parent_required)
} else {
handle_custom_pushdown(plan, parent_required, maintains_input_order)
}
Expand Down Expand Up @@ -606,6 +611,102 @@ fn handle_custom_pushdown(
}
}

// For hash join we only maintain the input order for the right child
// for join type: Inner, Right, RightSemi, RightAnti
fn handle_hash_join(
plan: &HashJoinExec,
parent_required: &LexRequirement,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
// If there's no requirement from the parent or the plan has no children
// or the join type is not Inner, Right, RightSemi, RightAnti, return early
if parent_required.is_empty() || !plan.maintains_input_order()[1] {
return Ok(None);
}

// Collect all unique column indices used in the parent-required sorting expression
let all_indices: HashSet<usize> = parent_required
.iter()
.flat_map(|order| {
collect_columns(&order.expr)
.into_iter()
.map(|col| col.index())
.collect::<HashSet<_>>()
})
.collect();

let column_indices = build_join_column_index(plan);
let projected_indices: Vec<_> = if let Some(projection) = &plan.projection {
projection.iter().map(|&i| &column_indices[i]).collect()
} else {
column_indices.iter().collect()
};
let len_of_left_fields = projected_indices
.iter()
.filter(|ci| ci.side == JoinSide::Left)
.count();

let all_from_right_child = all_indices.iter().all(|i| *i >= len_of_left_fields);

// If all columns are from the right child, update the parent requirements
if all_from_right_child {
// Transform the parent-required expression for the child schema by adjusting columns
let updated_parent_req = parent_required
.iter()
.map(|req| {
let child_schema = plan.children()[1].schema();
let updated_columns = Arc::clone(&req.expr)
.transform_up(|expr| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let index = projected_indices[col.index()].index;
Ok(Transformed::yes(Arc::new(Column::new(
child_schema.field(index).name(),
index,
))))
} else {
Ok(Transformed::no(expr))
}
})?
.data;
Ok(PhysicalSortRequirement::new(updated_columns, req.options))
})
.collect::<Result<Vec<_>>>()?;

// Populating with the updated requirements for children that maintain order
Ok(Some(vec![
None,
Some(LexRequirement::new(updated_parent_req)),
]))
} else {
Ok(None)
}
}

// this function is used to build the column index for the hash join
// push down sort requirements to the right child
fn build_join_column_index(plan: &HashJoinExec) -> Vec<ColumnIndex> {
let map_fields = |schema: SchemaRef, side: JoinSide| {
schema
.fields()
.iter()
.enumerate()
.map(|(index, _)| ColumnIndex { index, side })
.collect::<Vec<_>>()
};

match plan.join_type() {
JoinType::Inner | JoinType::Right => {
map_fields(plan.left().schema(), JoinSide::Left)
.into_iter()
.chain(map_fields(plan.right().schema(), JoinSide::Right))
.collect::<Vec<_>>()
}
JoinType::RightSemi | JoinType::RightAnti => {
map_fields(plan.right().schema(), JoinSide::Right)
}
_ => unreachable!("unexpected join type: {}", plan.join_type()),
}
}

/// Define the Requirements Compatibility
#[derive(Debug)]
enum RequirementsCompatibility {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,10 @@ pub struct SimpleScalarUDF {

impl Debug for SimpleScalarUDF {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ScalarUDF")
f.debug_struct("SimpleScalarUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("return_type", &self.return_type)
.field("fun", &"<FUNC>")
.finish()
}
Expand Down Expand Up @@ -524,9 +525,10 @@ pub struct SimpleAggregateUDF {

impl Debug for SimpleAggregateUDF {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("AggregateUDF")
f.debug_struct("SimpleAggregateUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("return_type", &self.return_type)
.field("fun", &"<FUNC>")
.finish()
}
Expand Down
22 changes: 11 additions & 11 deletions datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ pub mod r#struct;
pub mod version;

// create UDFs
make_udf_function!(arrow_cast::ArrowCastFunc, ARROW_CAST, arrow_cast);
make_udf_function!(nullif::NullIfFunc, NULLIF, nullif);
make_udf_function!(nvl::NVLFunc, NVL, nvl);
make_udf_function!(nvl2::NVL2Func, NVL2, nvl2);
make_udf_function!(arrowtypeof::ArrowTypeOfFunc, ARROWTYPEOF, arrow_typeof);
make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);
make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct);
make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field);
make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce);
make_udf_function!(greatest::GreatestFunc, GREATEST, greatest);
make_udf_function!(version::VersionFunc, VERSION, version);
make_udf_function!(arrow_cast::ArrowCastFunc, arrow_cast);
make_udf_function!(nullif::NullIfFunc, nullif);
make_udf_function!(nvl::NVLFunc, nvl);
make_udf_function!(nvl2::NVL2Func, nvl2);
make_udf_function!(arrowtypeof::ArrowTypeOfFunc, arrow_typeof);
make_udf_function!(r#struct::StructFunc, r#struct);
make_udf_function!(named_struct::NamedStructFunc, named_struct);
make_udf_function!(getfield::GetFieldFunc, get_field);
make_udf_function!(coalesce::CoalesceFunc, coalesce);
make_udf_function!(greatest::GreatestFunc, greatest);
make_udf_function!(version::VersionFunc, version);

pub mod expr_fn {
use datafusion_expr::{Expr, Literal};
Expand Down
12 changes: 6 additions & 6 deletions datafusion/functions/src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ pub mod sha224;
pub mod sha256;
pub mod sha384;
pub mod sha512;
make_udf_function!(digest::DigestFunc, DIGEST, digest);
make_udf_function!(md5::Md5Func, MD5, md5);
make_udf_function!(sha224::SHA224Func, SHA224, sha224);
make_udf_function!(sha256::SHA256Func, SHA256, sha256);
make_udf_function!(sha384::SHA384Func, SHA384, sha384);
make_udf_function!(sha512::SHA512Func, SHA512, sha512);
make_udf_function!(digest::DigestFunc, digest);
make_udf_function!(md5::Md5Func, md5);
make_udf_function!(sha224::SHA224Func, sha224);
make_udf_function!(sha256::SHA256Func, sha256);
make_udf_function!(sha384::SHA384Func, sha384);
make_udf_function!(sha512::SHA512Func, sha512);

pub mod expr_fn {
export_functions!((
Expand Down
54 changes: 17 additions & 37 deletions datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,23 @@ pub mod to_timestamp;
pub mod to_unixtime;

// create UDFs
make_udf_function!(current_date::CurrentDateFunc, CURRENT_DATE, current_date);
make_udf_function!(current_time::CurrentTimeFunc, CURRENT_TIME, current_time);
make_udf_function!(date_bin::DateBinFunc, DATE_BIN, date_bin);
make_udf_function!(date_part::DatePartFunc, DATE_PART, date_part);
make_udf_function!(date_trunc::DateTruncFunc, DATE_TRUNC, date_trunc);
make_udf_function!(make_date::MakeDateFunc, MAKE_DATE, make_date);
make_udf_function!(
from_unixtime::FromUnixtimeFunc,
FROM_UNIXTIME,
from_unixtime
);
make_udf_function!(now::NowFunc, NOW, now);
make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char);
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
make_udf_function!(to_local_time::ToLocalTimeFunc, TO_LOCAL_TIME, to_local_time);
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
make_udf_function!(
to_timestamp::ToTimestampSecondsFunc,
TO_TIMESTAMP_SECONDS,
to_timestamp_seconds
);
make_udf_function!(
to_timestamp::ToTimestampMillisFunc,
TO_TIMESTAMP_MILLIS,
to_timestamp_millis
);
make_udf_function!(
to_timestamp::ToTimestampMicrosFunc,
TO_TIMESTAMP_MICROS,
to_timestamp_micros
);
make_udf_function!(
to_timestamp::ToTimestampNanosFunc,
TO_TIMESTAMP_NANOS,
to_timestamp_nanos
);
make_udf_function!(current_date::CurrentDateFunc, current_date);
make_udf_function!(current_time::CurrentTimeFunc, current_time);
make_udf_function!(date_bin::DateBinFunc, date_bin);
make_udf_function!(date_part::DatePartFunc, date_part);
make_udf_function!(date_trunc::DateTruncFunc, date_trunc);
make_udf_function!(make_date::MakeDateFunc, make_date);
make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime);
make_udf_function!(now::NowFunc, now);
make_udf_function!(to_char::ToCharFunc, to_char);
make_udf_function!(to_date::ToDateFunc, to_date);
make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time);
make_udf_function!(to_unixtime::ToUnixtimeFunc, to_unixtime);
make_udf_function!(to_timestamp::ToTimestampFunc, to_timestamp);
make_udf_function!(to_timestamp::ToTimestampSecondsFunc, to_timestamp_seconds);
make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis);
make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros);
make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos);

// we cannot currently use the export_functions macro since it doesn't handle
// functions with varargs currently
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::sync::Arc;
pub mod inner;

// create `encode` and `decode` UDFs
make_udf_function!(inner::EncodeFunc, ENCODE, encode);
make_udf_function!(inner::DecodeFunc, DECODE, decode);
make_udf_function!(inner::EncodeFunc, encode);
make_udf_function!(inner::DecodeFunc, decode);

// Export the functions out of this package, both as expr_fn as well as a list of functions
pub mod expr_fn {
Expand Down
Loading

0 comments on commit 7400429

Please sign in to comment.