From e2b368a89ac1681175a8ea4b6a8a9d11e8793511 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Sat, 24 Feb 2024 16:22:49 -0500 Subject: [PATCH] Convert remaining UDFs to use ScalarUDFImpl --- .../src/udfs/datetime/str_to_utc_timestamp.rs | 2 +- .../src/udfs/datetime/to_utc_timestamp.rs | 62 ++++++++++---- .../udfs/datetime/utc_timestamp_to_epoch.rs | 78 +++++++++++------ .../src/udfs/datetime/utc_timestamp_to_str.rs | 85 ++++++++++++------- vegafusion-sql/src/compile/scalar.rs | 57 +++++++++---- 5 files changed, 192 insertions(+), 92 deletions(-) diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs index 7f903888..981acb68 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs @@ -357,7 +357,7 @@ impl ScalarUDFImpl for StrToUtcTimestampUDF { fn return_type( &self, - arg_types: &[DataType], + _arg_types: &[DataType], ) -> vegafusion_common::datafusion_common::Result { Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) } diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs index 036cca04..1a88dd8c 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs @@ -1,9 +1,11 @@ use chrono::TimeZone; use chrono::{NaiveDateTime, Timelike}; use chrono_tz::Tz; +use std::any::Any; use std::str::FromStr; use std::sync::Arc; use vegafusion_common::arrow::array::Array; +use vegafusion_common::datafusion_expr::ScalarUDFImpl; use vegafusion_common::{ arrow::{ array::{ArrayRef, TimestampMillisecondArray}, @@ -11,14 +13,48 @@ use vegafusion_common::{ datatypes::{DataType, TimeUnit}, }, datafusion_common::{DataFusionError, ScalarValue}, - datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - Volatility, - }, + datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility}, }; -fn make_to_utc_timestamp_udf() -> ScalarUDF { - let scalar_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| { +#[derive(Debug, Clone)] +pub struct ToUtcTimestampUDF { + signature: Signature, +} + +impl ToUtcTimestampUDF { + pub fn new() -> Self { + // Signature should be (Timestamp, UTF8), but specifying Timestamp in the signature + // requires specifying the timezone explicitly, and DataFusion doesn't currently + // coerce between timezones. + let signature: Signature = Signature::any(2, Volatility::Immutable); + Self { signature } + } +} + +impl ScalarUDFImpl for ToUtcTimestampUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_utc_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> vegafusion_common::datafusion_common::Result { + Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) + } + + fn invoke( + &self, + args: &[ColumnarValue], + ) -> vegafusion_common::datafusion_common::Result { // [0] data array let timestamp_array = match &args[0] { ColumnarValue::Array(array) => array.clone(), @@ -45,17 +81,7 @@ fn make_to_utc_timestamp_udf() -> ScalarUDF { } else { ScalarValue::try_from_array(&result_array, 0).map(ColumnarValue::Scalar) } - }); - - let return_type: ReturnTypeFunction = - Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None)))); - - // Signature should be (Timestamp, UTF8), but specifying Timestamp in the signature - // requires specifying the timezone explicitly, and DataFusion doesn't currently - // coerce between timezones. - let signature: Signature = Signature::any(2, Volatility::Immutable); - - ScalarUDF::new("to_utc_timestamp", &signature, &return_type, &scalar_fn) + } } pub fn to_utc_timestamp(timestamp_array: ArrayRef, tz: Tz) -> Result { @@ -127,5 +153,5 @@ pub fn to_timestamp_ms(array: &ArrayRef) -> Result { } lazy_static! { - pub static ref TO_UTC_TIMESTAMP_UDF: ScalarUDF = make_to_utc_timestamp_udf(); + pub static ref TO_UTC_TIMESTAMP_UDF: ScalarUDF = ScalarUDF::from(ToUtcTimestampUDF::new()); } diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_epoch.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_epoch.rs index 1fd4780e..d48ae1b9 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_epoch.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_epoch.rs @@ -1,20 +1,59 @@ use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms; -use std::sync::Arc; -use vegafusion_common::datafusion_expr::TypeSignature; +use std::any::Any; +use vegafusion_common::datafusion_expr::{ScalarUDFImpl, TypeSignature}; use vegafusion_common::{ arrow::{ compute::cast, datatypes::{DataType, TimeUnit}, }, datafusion_common::ScalarValue, - datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - Volatility, - }, + datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility}, }; -fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF { - let time_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| { +#[derive(Debug, Clone)] +pub struct UtcTimestampToEpochUDF { + signature: Signature, +} + +impl UtcTimestampToEpochUDF { + pub fn new() -> Self { + let signature = Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Date32]), + TypeSignature::Exact(vec![DataType::Date64]), + TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Millisecond, None)]), + TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]), + ], + Volatility::Immutable, + ); + Self { signature } + } +} + +impl ScalarUDFImpl for UtcTimestampToEpochUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "utc_timestamp_to_epoch_ms" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> vegafusion_common::datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke( + &self, + args: &[ColumnarValue], + ) -> vegafusion_common::datafusion_common::Result { // [0] data array let data_array = match &args[0] { ColumnarValue::Array(array) => array.clone(), @@ -31,27 +70,10 @@ fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF { } else { ScalarValue::try_from_array(&result_array, 0).map(ColumnarValue::Scalar) } - }); - - let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Int64))); - let signature = Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Date32]), - TypeSignature::Exact(vec![DataType::Date64]), - TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Millisecond, None)]), - TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]), - ], - Volatility::Immutable, - ); - - ScalarUDF::new( - "utc_timestamp_to_epoch_ms", - &signature, - &return_type, - &time_fn, - ) + } } lazy_static! { - pub static ref UTC_TIMESTAMP_TO_EPOCH_MS: ScalarUDF = make_utc_timestamp_to_epoch_ms_udf(); + pub static ref UTC_TIMESTAMP_TO_EPOCH_MS: ScalarUDF = + ScalarUDF::from(UtcTimestampToEpochUDF::new()); } diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_str.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_str.rs index 88ae6749..7df843c8 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_str.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/utc_timestamp_to_str.rs @@ -1,21 +1,67 @@ use crate::udfs::datetime::from_utc_timestamp::from_utc_timestamp; use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms; use chrono::NaiveDateTime; +use std::any::Any; use std::str::FromStr; use std::sync::Arc; use vegafusion_common::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray}; use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue}; -use vegafusion_common::datafusion_expr::TypeSignature; +use vegafusion_common::datafusion_expr::{ScalarUDFImpl, TypeSignature}; use vegafusion_common::{ arrow::datatypes::{DataType, TimeUnit}, - datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - Volatility, - }, + datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility}, }; -fn make_utc_timestamp_to_str_udf() -> ScalarUDF { - let scalar_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| { +#[derive(Debug, Clone)] +pub struct UtcTimestampToStrUDF { + signature: Signature, +} + +impl UtcTimestampToStrUDF { + pub fn new() -> Self { + let signature = Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]), + TypeSignature::Exact(vec![ + DataType::Timestamp(TimeUnit::Millisecond, None), + DataType::Utf8, + ]), + TypeSignature::Exact(vec![ + DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Utf8, + ]), + ], + Volatility::Immutable, + ); + Self { signature } + } +} + +impl ScalarUDFImpl for UtcTimestampToStrUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "utc_timestamp_to_str" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> vegafusion_common::datafusion_common::Result { + Ok(DataType::Utf8) + } + + fn invoke( + &self, + args: &[ColumnarValue], + ) -> vegafusion_common::datafusion_common::Result { // Argument order // [0] data array let timestamp_array = match &args[0] { @@ -74,29 +120,10 @@ fn make_utc_timestamp_to_str_udf() -> ScalarUDF { } else { ScalarValue::try_from_array(&formatted, 0).map(ColumnarValue::Scalar) } - }); - - let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Utf8))); - - let signature = Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]), - TypeSignature::Exact(vec![ - DataType::Timestamp(TimeUnit::Millisecond, None), - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Utf8, - ]), - ], - Volatility::Immutable, - ); - - ScalarUDF::new("utc_timestamp_to_str", &signature, &return_type, &scalar_fn) + } } lazy_static! { - pub static ref UTC_TIMESTAMP_TO_STR_UDF: ScalarUDF = make_utc_timestamp_to_str_udf(); + pub static ref UTC_TIMESTAMP_TO_STR_UDF: ScalarUDF = + ScalarUDF::from(UtcTimestampToStrUDF::new()); } diff --git a/vegafusion-sql/src/compile/scalar.rs b/vegafusion-sql/src/compile/scalar.rs index c8e8c154..8b29f5c2 100644 --- a/vegafusion-sql/src/compile/scalar.rs +++ b/vegafusion-sql/src/compile/scalar.rs @@ -5,13 +5,14 @@ use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; use datafusion_expr::{ - expr, lit, ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionDefinition, - ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, + expr, lit, ColumnarValue, Expr, ScalarFunctionDefinition, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, }; use sqlparser::ast::{ Expr as SqlExpr, Function as SqlFunction, FunctionArg as SqlFunctionArg, FunctionArgExpr, Ident, ObjectName as SqlObjectName, Value as SqlValue, }; +use std::any::Any; use std::ops::Add; use std::sync::Arc; use vegafusion_common::data::scalar::ArrayRefHelpers; @@ -260,26 +261,50 @@ impl ToSqlScalar for ScalarValue { fn ms_to_timestamp(v: i64, dialect: &Dialect) -> Result { // Hack to recursively transform the epoch_ms_to_utc_timestamp - let return_type: ReturnTypeFunction = - Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None)))); - let signature: Signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable); - let scalar_fn: ScalarFunctionImplementation = Arc::new(move |_args: &[ColumnarValue]| { - panic!("Placeholder UDF implementation should not be called") - }); - - let udf = ScalarUDF::new( - "epoch_ms_to_utc_timestamp", - &signature, - &return_type, - &scalar_fn, - ); Expr::ScalarFunction(expr::ScalarFunction { - func_def: ScalarFunctionDefinition::UDF(Arc::new(udf)), + func_def: ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from( + EpochMsToUtcTimestampUDF::new(), + ))), args: vec![lit(v)], }) .to_sql(dialect, &DFSchema::empty()) } +// Hack to recursively transform the epoch_ms_to_utc_timestamp +#[derive(Debug, Clone)] +pub struct EpochMsToUtcTimestampUDF { + signature: Signature, +} + +impl EpochMsToUtcTimestampUDF { + pub fn new() -> Self { + let signature: Signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable); + Self { signature } + } +} + +impl ScalarUDFImpl for EpochMsToUtcTimestampUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "epoch_ms_to_utc_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> datafusion_common::Result { + panic!("Placeholder UDF implementation should not be called") + } +} + fn date32_to_date(days: &Option, dialect: &Dialect) -> Result { let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); match days {