From 1f49d4615ebc464394d38b2dea0fa130749ea55f Mon Sep 17 00:00:00 2001 From: Michael J Ward Date: Sun, 16 Jun 2024 10:53:00 -0500 Subject: [PATCH] use ScalarValue::to_pyarrow to convert to python object (#731) Closes #729 --- Cargo.lock | 1 + Cargo.toml | 1 + src/expr.rs | 91 ++--------------------------------------------------- 3 files changed, 5 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb674d0f2..2e437a03f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1032,6 +1032,7 @@ dependencies = [ name = "datafusion-python" version = "39.0.0" dependencies = [ + "arrow", "async-trait", "datafusion", "datafusion-common", diff --git a/Cargo.toml b/Cargo.toml index 85a19d1e5..7285cf3eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ substrait = ["dep:datafusion-substrait"] tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.8" pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38", "gil-refs"] } +arrow = { version = "52", feature = ["pyarrow"] } datafusion = { version = "39.0.0", features = ["pyarrow", "avro", "unicode_expressions"] } datafusion-common = { version = "39.0.0", features = ["pyarrow"] } datafusion-expr = "39.0.0" diff --git a/src/expr.rs b/src/expr.rs index 9fd29a591..09a773c4d 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -21,6 +21,7 @@ use pyo3::{basic::CompareOp, prelude::*}; use std::convert::{From, Into}; use std::sync::Arc; +use arrow::pyarrow::ToPyArrow; use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::functions::core::expr_ext::FieldAccessor; @@ -32,7 +33,7 @@ use datafusion_expr::{ }; use crate::common::data_type::{DataTypeMap, RexType}; -use crate::errors::{py_datafusion_err, py_runtime_err, py_type_err, DataFusionError}; +use crate::errors::{py_runtime_err, py_type_err, DataFusionError}; use crate::expr::aggregate_expr::PyAggregateFunction; use crate::expr::binary_expr::PyBinaryExpr; use crate::expr::column::PyColumn; @@ -300,93 +301,7 @@ impl PyExpr { /// Extracts the Expr value into a PyObject that can be shared with Python pub fn python_value(&self, py: Python) -> PyResult { match &self.expr { - Expr::Literal(scalar_value) => match scalar_value { - ScalarValue::Null => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Null".to_string(), - ), - )), - ScalarValue::Boolean(v) => Ok(v.into_py(py)), - ScalarValue::Float16(_) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Float16".to_string(), - ), - )), - ScalarValue::Float32(v) => Ok(v.into_py(py)), - ScalarValue::Float64(v) => Ok(v.into_py(py)), - ScalarValue::Decimal128(v, _, _) => Ok(v.into_py(py)), - ScalarValue::Decimal256(_, _, _) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Decimal256".to_string(), - ), - )), - ScalarValue::Int8(v) => Ok(v.into_py(py)), - ScalarValue::Int16(v) => Ok(v.into_py(py)), - ScalarValue::Int32(v) => Ok(v.into_py(py)), - ScalarValue::Int64(v) => Ok(v.into_py(py)), - ScalarValue::UInt8(v) => Ok(v.into_py(py)), - ScalarValue::UInt16(v) => Ok(v.into_py(py)), - ScalarValue::UInt32(v) => Ok(v.into_py(py)), - ScalarValue::UInt64(v) => Ok(v.into_py(py)), - ScalarValue::Utf8(v) => Ok(v.clone().into_py(py)), - ScalarValue::LargeUtf8(v) => Ok(v.clone().into_py(py)), - ScalarValue::Binary(v) => Ok(v.clone().into_py(py)), - ScalarValue::FixedSizeBinary(_, _) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::FixedSizeBinary".to_string(), - ), - )), - ScalarValue::LargeBinary(v) => Ok(v.clone().into_py(py)), - ScalarValue::List(_) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::List".to_string(), - ), - )), - ScalarValue::Date32(v) => Ok(v.into_py(py)), - ScalarValue::Date64(v) => Ok(v.into_py(py)), - ScalarValue::Time32Second(v) => Ok(v.into_py(py)), - ScalarValue::Time32Millisecond(v) => Ok(v.into_py(py)), - ScalarValue::Time64Microsecond(v) => Ok(v.into_py(py)), - ScalarValue::Time64Nanosecond(v) => Ok(v.into_py(py)), - ScalarValue::TimestampSecond(v, _) => Ok(v.into_py(py)), - ScalarValue::TimestampMillisecond(v, _) => Ok(v.into_py(py)), - ScalarValue::TimestampMicrosecond(v, _) => Ok(v.into_py(py)), - ScalarValue::TimestampNanosecond(v, _) => Ok(v.into_py(py)), - ScalarValue::IntervalYearMonth(v) => Ok(v.into_py(py)), - ScalarValue::IntervalDayTime(v) => Ok(ScalarValue::IntervalDayTime(*v).into_py(py)), - ScalarValue::IntervalMonthDayNano(v) => { - Ok(ScalarValue::IntervalMonthDayNano(*v).into_py(py)) - } - ScalarValue::DurationSecond(v) => Ok(v.into_py(py)), - ScalarValue::DurationMicrosecond(v) => Ok(v.into_py(py)), - ScalarValue::DurationNanosecond(v) => Ok(v.into_py(py)), - ScalarValue::DurationMillisecond(v) => Ok(v.into_py(py)), - ScalarValue::Struct(_) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Struct".to_string(), - ), - )), - ScalarValue::Dictionary(_, _) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Dictionary".to_string(), - ), - )), - ScalarValue::FixedSizeList(_) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::FixedSizeList".to_string(), - ), - )), - ScalarValue::LargeList(_) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::LargeList".to_string(), - ), - )), - ScalarValue::Union(_, _, _) => Err(py_datafusion_err( - datafusion_common::DataFusionError::NotImplemented( - "ScalarValue::Union".to_string(), - ), - )), - }, + Expr::Literal(scalar_value) => Ok(scalar_value.to_pyarrow(py)?), _ => Err(py_type_err(format!( "Non Expr::Literal encountered in types: {:?}", &self.expr