From 14000a2ca1bb9bad4c7ce4b7531e934cc41f7a19 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 May 2024 14:50:19 +0000 Subject: [PATCH] chore: upgrade to Datafusion 38 --- Cargo.toml | 16 ++++++------ .../src/delta_datafusion/cdf/scan_utils.rs | 1 + crates/core/src/delta_datafusion/expr.rs | 3 ++- crates/core/src/delta_datafusion/mod.rs | 25 ++++++++----------- crates/core/src/operations/merge/mod.rs | 10 ++++---- crates/sql/src/logical_plan.rs | 18 ++++++------- crates/sql/src/planner.rs | 8 +++--- 7 files changed, 39 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ba72bab789..f5eee93bd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,14 +42,14 @@ object_store = { version = "0.9" } parquet = { version = "51" } # datafusion -datafusion = { version = "37.1" } -datafusion-expr = { version = "37.1" } -datafusion-common = { version = "37.1" } -datafusion-proto = { version = "37.1" } -datafusion-sql = { version = "37.1" } -datafusion-physical-expr = { version = "37.1" } -datafusion-functions = { version = "37.1" } -datafusion-functions-array = { version = "37.1" } +datafusion = { version = "38.0" } +datafusion-expr = { version = "38.0" } +datafusion-common = { version = "38.0" } +datafusion-proto = { version = "38.0" } +datafusion-sql = { version = "38.0" } +datafusion-physical-expr = { version = "38.0" } +datafusion-functions = { version = "38.0" } +datafusion-functions-array = { version = "38.0" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/src/delta_datafusion/cdf/scan_utils.rs b/crates/core/src/delta_datafusion/cdf/scan_utils.rs index 434afa4f74..b7c890a7b1 100644 --- a/crates/core/src/delta_datafusion/cdf/scan_utils.rs +++ b/crates/core/src/delta_datafusion/cdf/scan_utils.rs @@ -80,6 +80,7 @@ pub fn create_partition_values( last_modified: chrono::Utc.timestamp_nanos(0), version: None, }, + statistics: None, partition_values: new_part_values.clone(), extensions: None, range: None, diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 5b62726611..4566e0c1bc 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -429,10 +429,11 @@ mod test { use datafusion_common::{Column, ScalarValue, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ - col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, + col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, }; use datafusion_functions::core::arrow_cast; use datafusion_functions::encoding::expr_fn::decode; + use datafusion_functions::expr_fn::substring; use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index f3f9b5d6cc..cf3ea821b6 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -698,11 +698,11 @@ impl TableProvider for DeltaTable { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -777,11 +777,11 @@ impl TableProvider for DeltaTableProvider { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -989,6 +989,7 @@ pub(crate) fn partitioned_file_from_action( ..action.try_into().unwrap() }, partition_values, + statistics: None, range: None, extensions: None, } @@ -1425,14 +1426,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { | Expr::TryCast(_) => (), Expr::ScalarFunction(ScalarFunction { func_def, .. }) => { let v = match func_def { - datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(), datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility, - datafusion_expr::ScalarFunctionDefinition::Name(n) => { - self.result = Err(DeltaTableError::Generic(format!( - "Cannot determine volatility of find files predicate function {n}", - ))); - return Ok(TreeNodeRecursion::Stop); - } }; if v > Volatility::Immutable { self.result = Err(DeltaTableError::Generic(format!( @@ -1900,6 +1894,7 @@ mod tests { version: None, }, partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(), + statistics: None, range: None, extensions: None, }; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ce1ee7b223..a48c8a165a 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -502,7 +502,7 @@ impl MergeOperation { relation: Some(TableReference::Bare { table }), name, } => { - if table.eq(alias) { + if table.as_ref() == alias { Column { relation: Some(r), name, @@ -863,8 +863,8 @@ async fn try_construct_early_filter( table_snapshot: &DeltaTableState, session_state: &SessionState, source: &LogicalPlan, - source_name: &TableReference<'_>, - target_name: &TableReference<'_>, + source_name: &TableReference, + target_name: &TableReference, ) -> DeltaResult> { let table_metadata = table_snapshot.metadata(); let partition_columns = &table_metadata.partition_columns; @@ -1324,9 +1324,9 @@ async fn execute( let plan = projection.into_unoptimized_plan(); let mut fields: Vec = plan .schema() - .fields() + .columns() .iter() - .map(|f| col(f.qualified_column())) + .map(|f| col(f.clone())) .collect(); fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name))); diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 164462a90c..dacc41901a 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,7 +1,7 @@ use std::fmt::{self, Debug, Display}; use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference}; +use datafusion_common::{DFSchema, DFSchemaRef, TableReference}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; @@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { #[derive(Clone, PartialEq, Eq, Hash)] pub struct Vacuum { /// A reference to the table being vacuumed - pub table: OwnedTableReference, + pub table: TableReference, /// The retention threshold. pub retention_hours: Option, /// Return a list of up to 1000 files to be deleted. @@ -117,7 +117,7 @@ pub struct Vacuum { } impl Vacuum { - pub fn new(table: OwnedTableReference, retention_hours: Option, dry_run: bool) -> Self { + pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { table, retention_hours, @@ -133,13 +133,13 @@ impl Vacuum { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeHistory { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeHistory { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -153,13 +153,13 @@ impl DescribeHistory { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeDetails { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeDetails { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -172,13 +172,13 @@ impl DescribeDetails { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeFiles { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeFiles { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 0be14d59b0..6214125906 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion_common::{OwnedTableReference, Result as DFResult}; +use datafusion_common::{TableReference, Result as DFResult}; use datafusion_expr::logical_plan::{Extension, LogicalPlan}; use datafusion_sql::planner::{ object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel, @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(vacuum.table)?; let plan = DeltaStatement::Vacuum(Vacuum::new( - table_ref.to_owned_reference(), + table_ref.clone(), vacuum.retention_hours, vacuum.dry_run, )); @@ -66,7 +66,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(describe.table)?; let plan = - DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference())); + DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone())); Ok(LogicalPlan::Extension(Extension { node: Arc::new(plan), })) @@ -75,7 +75,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { pub(crate) fn object_name_to_table_reference( &self, object_name: ObjectName, - ) -> DFResult { + ) -> DFResult { object_name_to_table_reference(object_name, self.options.enable_ident_normalization) } }