Skip to content

Commit

Permalink
chore: upgrade to Datafusion 38
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed May 13, 2024
1 parent 353e08b commit 14000a2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 42 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ object_store = { version = "0.9" }
parquet = { version = "51" }

# datafusion
datafusion = { version = "37.1" }
datafusion-expr = { version = "37.1" }
datafusion-common = { version = "37.1" }
datafusion-proto = { version = "37.1" }
datafusion-sql = { version = "37.1" }
datafusion-physical-expr = { version = "37.1" }
datafusion-functions = { version = "37.1" }
datafusion-functions-array = { version = "37.1" }
datafusion = { version = "38.0" }
datafusion-expr = { version = "38.0" }
datafusion-common = { version = "38.0" }
datafusion-proto = { version = "38.0" }
datafusion-sql = { version = "38.0" }
datafusion-physical-expr = { version = "38.0" }
datafusion-functions = { version = "38.0" }
datafusion-functions-array = { version = "38.0" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub fn create_partition_values<F: FileAction>(
last_modified: chrono::Utc.timestamp_nanos(0),
version: None,
},
statistics: None,
partition_values: new_part_values.clone(),
extensions: None,
range: None,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,11 @@ mod test {
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
};
use datafusion_functions::core::arrow_cast;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions::expr_fn::substring;
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
Expand Down
25 changes: 10 additions & 15 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,11 @@ impl TableProvider for DeltaTable {
Ok(Arc::new(scan))
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> DataFusionResult<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
_filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -777,11 +777,11 @@ impl TableProvider for DeltaTableProvider {
Ok(Arc::new(scan))
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> DataFusionResult<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
_filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -989,6 +989,7 @@ pub(crate) fn partitioned_file_from_action(
..action.try_into().unwrap()
},
partition_values,
statistics: None,
range: None,
extensions: None,
}
Expand Down Expand Up @@ -1425,14 +1426,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
| Expr::TryCast(_) => (),
Expr::ScalarFunction(ScalarFunction { func_def, .. }) => {
let v = match func_def {
datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(),
datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility,
datafusion_expr::ScalarFunctionDefinition::Name(n) => {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(TreeNodeRecursion::Stop);
}
};
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
Expand Down Expand Up @@ -1900,6 +1894,7 @@ mod tests {
version: None,
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
statistics: None,
range: None,
extensions: None,
};
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl MergeOperation {
relation: Some(TableReference::Bare { table }),
name,
} => {
if table.eq(alias) {
if table.as_ref() == alias {
Column {
relation: Some(r),
name,
Expand Down Expand Up @@ -863,8 +863,8 @@ async fn try_construct_early_filter(
table_snapshot: &DeltaTableState,
session_state: &SessionState,
source: &LogicalPlan,
source_name: &TableReference<'_>,
target_name: &TableReference<'_>,
source_name: &TableReference,
target_name: &TableReference,
) -> DeltaResult<Option<Expr>> {
let table_metadata = table_snapshot.metadata();
let partition_columns = &table_metadata.partition_columns;
Expand Down Expand Up @@ -1324,9 +1324,9 @@ async fn execute(
let plan = projection.into_unoptimized_plan();
let mut fields: Vec<Expr> = plan
.schema()
.fields()
.columns()
.iter()
.map(|f| col(f.qualified_column()))
.map(|f| col(f.clone()))
.collect();

fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name)));
Expand Down
18 changes: 9 additions & 9 deletions crates/sql/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{self, Debug, Display};
use std::sync::Arc;

use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference};
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Expr, UserDefinedLogicalNodeCore};

Expand Down Expand Up @@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Vacuum {
/// A reference to the table being vacuumed
pub table: OwnedTableReference,
pub table: TableReference,
/// The retention threshold.
pub retention_hours: Option<i32>,
/// Return a list of up to 1000 files to be deleted.
Expand All @@ -117,7 +117,7 @@ pub struct Vacuum {
}

impl Vacuum {
pub fn new(table: OwnedTableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
pub fn new(table: TableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
Self {
table,
retention_hours,
Expand All @@ -133,13 +133,13 @@ impl Vacuum {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeHistory {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeHistory {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand All @@ -153,13 +153,13 @@ impl DescribeHistory {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeDetails {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeDetails {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand All @@ -172,13 +172,13 @@ impl DescribeDetails {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeFiles {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeFiles {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand Down
8 changes: 4 additions & 4 deletions crates/sql/src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use datafusion_common::{OwnedTableReference, Result as DFResult};
use datafusion_common::{TableReference, Result as DFResult};
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use datafusion_sql::planner::{
object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel,
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult<LogicalPlan> {
let table_ref = self.object_name_to_table_reference(vacuum.table)?;
let plan = DeltaStatement::Vacuum(Vacuum::new(
table_ref.to_owned_reference(),
table_ref.clone(),
vacuum.retention_hours,
vacuum.dry_run,
));
Expand All @@ -66,7 +66,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult<LogicalPlan> {
let table_ref = self.object_name_to_table_reference(describe.table)?;
let plan =
DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference()));
DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone()));
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(plan),
}))
Expand All @@ -75,7 +75,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
pub(crate) fn object_name_to_table_reference(
&self,
object_name: ObjectName,
) -> DFResult<OwnedTableReference> {
) -> DFResult<TableReference> {
object_name_to_table_reference(object_name, self.options.enable_ident_normalization)
}
}
Expand Down

0 comments on commit 14000a2

Please sign in to comment.