Skip to content

Commit

Permalink
Convert remaining UDFs to use ScalarUDFImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Feb 24, 2024
1 parent 65f226f commit e2b368a
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl ScalarUDFImpl for StrToUtcTimestampUDF {

fn return_type(
&self,
arg_types: &[DataType],
_arg_types: &[DataType],
) -> vegafusion_common::datafusion_common::Result<DataType> {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
Expand Down
62 changes: 44 additions & 18 deletions vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,60 @@
use chrono::TimeZone;
use chrono::{NaiveDateTime, Timelike};
use chrono_tz::Tz;
use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::array::Array;
use vegafusion_common::datafusion_expr::ScalarUDFImpl;
use vegafusion_common::{
arrow::{
array::{ArrayRef, TimestampMillisecondArray},
compute::cast,
datatypes::{DataType, TimeUnit},
},
datafusion_common::{DataFusionError, ScalarValue},
datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
Volatility,
},
datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility},
};

fn make_to_utc_timestamp_udf() -> ScalarUDF {
let scalar_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
#[derive(Debug, Clone)]
pub struct ToUtcTimestampUDF {
signature: Signature,
}

impl ToUtcTimestampUDF {
pub fn new() -> Self {
// Signature should be (Timestamp, UTF8), but specifying Timestamp in the signature
// requires specifying the timezone explicitly, and DataFusion doesn't currently
// coerce between timezones.
let signature: Signature = Signature::any(2, Volatility::Immutable);
Self { signature }
}
}

impl ScalarUDFImpl for ToUtcTimestampUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"to_utc_timestamp"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(
&self,
_arg_types: &[DataType],
) -> vegafusion_common::datafusion_common::Result<DataType> {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}

fn invoke(
&self,
args: &[ColumnarValue],
) -> vegafusion_common::datafusion_common::Result<ColumnarValue> {
// [0] data array
let timestamp_array = match &args[0] {
ColumnarValue::Array(array) => array.clone(),
Expand All @@ -45,17 +81,7 @@ fn make_to_utc_timestamp_udf() -> ScalarUDF {
} else {
ScalarValue::try_from_array(&result_array, 0).map(ColumnarValue::Scalar)
}
});

let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None))));

// Signature should be (Timestamp, UTF8), but specifying Timestamp in the signature
// requires specifying the timezone explicitly, and DataFusion doesn't currently
// coerce between timezones.
let signature: Signature = Signature::any(2, Volatility::Immutable);

ScalarUDF::new("to_utc_timestamp", &signature, &return_type, &scalar_fn)
}
}

pub fn to_utc_timestamp(timestamp_array: ArrayRef, tz: Tz) -> Result<ArrayRef, DataFusionError> {
Expand Down Expand Up @@ -127,5 +153,5 @@ pub fn to_timestamp_ms(array: &ArrayRef) -> Result<ArrayRef, DataFusionError> {
}

lazy_static! {
pub static ref TO_UTC_TIMESTAMP_UDF: ScalarUDF = make_to_utc_timestamp_udf();
pub static ref TO_UTC_TIMESTAMP_UDF: ScalarUDF = ScalarUDF::from(ToUtcTimestampUDF::new());
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,59 @@
use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms;
use std::sync::Arc;
use vegafusion_common::datafusion_expr::TypeSignature;
use std::any::Any;
use vegafusion_common::datafusion_expr::{ScalarUDFImpl, TypeSignature};
use vegafusion_common::{
arrow::{
compute::cast,
datatypes::{DataType, TimeUnit},
},
datafusion_common::ScalarValue,
datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
Volatility,
},
datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility},
};

fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF {
let time_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
#[derive(Debug, Clone)]
pub struct UtcTimestampToEpochUDF {
signature: Signature,
}

impl UtcTimestampToEpochUDF {
pub fn new() -> Self {
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32]),
TypeSignature::Exact(vec![DataType::Date64]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Millisecond, None)]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]),
],
Volatility::Immutable,
);
Self { signature }
}
}

impl ScalarUDFImpl for UtcTimestampToEpochUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"utc_timestamp_to_epoch_ms"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(
&self,
_arg_types: &[DataType],
) -> vegafusion_common::datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}

fn invoke(
&self,
args: &[ColumnarValue],
) -> vegafusion_common::datafusion_common::Result<ColumnarValue> {
// [0] data array
let data_array = match &args[0] {
ColumnarValue::Array(array) => array.clone(),
Expand All @@ -31,27 +70,10 @@ fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF {
} else {
ScalarValue::try_from_array(&result_array, 0).map(ColumnarValue::Scalar)
}
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Int64)));
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32]),
TypeSignature::Exact(vec![DataType::Date64]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Millisecond, None)]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]),
],
Volatility::Immutable,
);

ScalarUDF::new(
"utc_timestamp_to_epoch_ms",
&signature,
&return_type,
&time_fn,
)
}
}

lazy_static! {
pub static ref UTC_TIMESTAMP_TO_EPOCH_MS: ScalarUDF = make_utc_timestamp_to_epoch_ms_udf();
pub static ref UTC_TIMESTAMP_TO_EPOCH_MS: ScalarUDF =
ScalarUDF::from(UtcTimestampToEpochUDF::new());
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,67 @@
use crate::udfs::datetime::from_utc_timestamp::from_utc_timestamp;
use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms;
use chrono::NaiveDateTime;
use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::TypeSignature;
use vegafusion_common::datafusion_expr::{ScalarUDFImpl, TypeSignature};
use vegafusion_common::{
arrow::datatypes::{DataType, TimeUnit},
datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
Volatility,
},
datafusion_expr::{ColumnarValue, ScalarUDF, Signature, Volatility},
};

fn make_utc_timestamp_to_str_udf() -> ScalarUDF {
let scalar_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
#[derive(Debug, Clone)]
pub struct UtcTimestampToStrUDF {
signature: Signature,
}

impl UtcTimestampToStrUDF {
pub fn new() -> Self {
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Utf8,
]),
],
Volatility::Immutable,
);
Self { signature }
}
}

impl ScalarUDFImpl for UtcTimestampToStrUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"utc_timestamp_to_str"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(
&self,
_arg_types: &[DataType],
) -> vegafusion_common::datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke(
&self,
args: &[ColumnarValue],
) -> vegafusion_common::datafusion_common::Result<ColumnarValue> {
// Argument order
// [0] data array
let timestamp_array = match &args[0] {
Expand Down Expand Up @@ -74,29 +120,10 @@ fn make_utc_timestamp_to_str_udf() -> ScalarUDF {
} else {
ScalarValue::try_from_array(&formatted, 0).map(ColumnarValue::Scalar)
}
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Utf8)));

let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Utf8,
]),
],
Volatility::Immutable,
);

ScalarUDF::new("utc_timestamp_to_str", &signature, &return_type, &scalar_fn)
}
}

lazy_static! {
pub static ref UTC_TIMESTAMP_TO_STR_UDF: ScalarUDF = make_utc_timestamp_to_str_udf();
pub static ref UTC_TIMESTAMP_TO_STR_UDF: ScalarUDF =
ScalarUDF::from(UtcTimestampToStrUDF::new());
}
57 changes: 41 additions & 16 deletions vegafusion-sql/src/compile/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use datafusion_expr::{
expr, lit, ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionDefinition,
ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
expr, lit, ColumnarValue, Expr, ScalarFunctionDefinition, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
};
use sqlparser::ast::{
Expr as SqlExpr, Function as SqlFunction, FunctionArg as SqlFunctionArg, FunctionArgExpr,
Ident, ObjectName as SqlObjectName, Value as SqlValue,
};
use std::any::Any;
use std::ops::Add;
use std::sync::Arc;
use vegafusion_common::data::scalar::ArrayRefHelpers;
Expand Down Expand Up @@ -260,26 +261,50 @@ impl ToSqlScalar for ScalarValue {

fn ms_to_timestamp(v: i64, dialect: &Dialect) -> Result<SqlExpr> {
// Hack to recursively transform the epoch_ms_to_utc_timestamp
let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None))));
let signature: Signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable);
let scalar_fn: ScalarFunctionImplementation = Arc::new(move |_args: &[ColumnarValue]| {
panic!("Placeholder UDF implementation should not be called")
});

let udf = ScalarUDF::new(
"epoch_ms_to_utc_timestamp",
&signature,
&return_type,
&scalar_fn,
);
Expr::ScalarFunction(expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(udf)),
func_def: ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(
EpochMsToUtcTimestampUDF::new(),
))),
args: vec![lit(v)],
})
.to_sql(dialect, &DFSchema::empty())
}

// Hack to recursively transform the epoch_ms_to_utc_timestamp
#[derive(Debug, Clone)]
pub struct EpochMsToUtcTimestampUDF {
signature: Signature,
}

impl EpochMsToUtcTimestampUDF {
pub fn new() -> Self {
let signature: Signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable);
Self { signature }
}
}

impl ScalarUDFImpl for EpochMsToUtcTimestampUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"epoch_ms_to_utc_timestamp"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}

fn invoke(&self, _args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
panic!("Placeholder UDF implementation should not be called")
}
}

fn date32_to_date(days: &Option<i32>, dialect: &Dialect) -> Result<SqlExpr> {
let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
match days {
Expand Down

0 comments on commit e2b368a

Please sign in to comment.