Skip to content

Commit

Permalink
All timestamp UDFs should accept nanoseconds / Date32 / Date64 (#257)
Browse files Browse the repository at this point in the history
* All timestamp UDFs should accept nanoseconds

* Date32 and Date64 should be allowed everywhere timestamps are

* Allow casting to timestamp in duckdb and postgres

* More Date32/Date64 support

* Cast to timestamp for other dialects
  • Loading branch information
jonmmease authored Mar 17, 2023
1 parent 7db62c9 commit 97b26f1
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 20 deletions.
31 changes: 26 additions & 5 deletions vegafusion-datafusion-udfs/src/udfs/datetime/date_add_tz.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use vegafusion_common::datafusion_expr::TypeSignature;
use vegafusion_common::{
arrow::datatypes::{DataType, TimeUnit},
datafusion_expr::{
Expand All @@ -15,12 +16,32 @@ fn make_date_add_tz_udf() -> ScalarUDF {
let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None))));

let signature = Signature::exact(
let signature = Signature::one_of(
vec![
DataType::Utf8,
DataType::Int32,
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Int32,
DataType::Date32,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Int32,
DataType::Date64,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Int32,
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Int32,
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Utf8,
]),
],
Volatility::Immutable,
);
Expand Down
29 changes: 25 additions & 4 deletions vegafusion-datafusion-udfs/src/udfs/datetime/date_part_tz.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::udfs::datetime::from_utc_timestamp::from_utc_timestamp;
use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms;
use datafusion_physical_expr::datetime_expressions;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::datafusion_expr::TypeSignature;
use vegafusion_common::{
arrow::datatypes::{DataType, TimeUnit},
datafusion_common::DataFusionError,
Expand All @@ -19,6 +21,8 @@ fn make_date_part_tz_udf() -> ScalarUDF {
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};

let timestamp_array = to_timestamp_ms(&timestamp_array)?;

// [2] timezone string
let tz_str = if let ColumnarValue::Scalar(default_input_tz) = &args[2] {
default_input_tz.to_string()
Expand Down Expand Up @@ -48,11 +52,28 @@ fn make_date_part_tz_udf() -> ScalarUDF {

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Float64)));

let signature = Signature::exact(
let signature = Signature::one_of(
vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8, // timezone
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Date32,
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Date64,
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Utf8, // timezone
]),
],
Volatility::Immutable,
);
Expand Down
26 changes: 22 additions & 4 deletions vegafusion-datafusion-udfs/src/udfs/datetime/date_trunc_tz.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use vegafusion_common::datafusion_expr::TypeSignature;
use vegafusion_common::{
arrow::datatypes::{DataType, TimeUnit},
datafusion_expr::{
Expand All @@ -15,11 +16,28 @@ fn make_date_trunc_tz_udf() -> ScalarUDF {
let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None))));

let signature = Signature::exact(
let signature = Signature::one_of(
vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8, // timezone
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Date32,
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Date64,
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8, // timezone
]),
TypeSignature::Exact(vec![
DataType::Utf8, // part
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Utf8, // timezone
]),
],
Volatility::Immutable,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ fn make_format_timestamp_udf() -> ScalarUDF {

let signature: Signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ fn make_from_utc_timestamp() -> ScalarUDF {

let signature: Signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub fn to_timestamp_ms(array: &ArrayRef) -> Result<ArrayRef, DataFusionError> {
)?)
}
}
DataType::Date32 => Ok(cast(
array,
&DataType::Timestamp(TimeUnit::Millisecond, None),
)?),
DataType::Date64 => Ok(cast(
array,
&DataType::Timestamp(TimeUnit::Millisecond, None),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::udfs::datetime::to_utc_timestamp::to_timestamp_ms;
use std::sync::Arc;
use vegafusion_common::datafusion_expr::TypeSignature;
use vegafusion_common::{
arrow::{
compute::cast,
Expand All @@ -18,6 +20,7 @@ fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};
let data_array = to_timestamp_ms(&data_array)?;

// cast timestamp millis to Int64
let result_array = cast(&data_array, &DataType::Int64)?;
Expand All @@ -31,8 +34,13 @@ fn make_utc_timestamp_to_epoch_ms_udf() -> ScalarUDF {
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Int64)));
let signature: Signature = Signature::exact(
vec![DataType::Timestamp(TimeUnit::Millisecond, None)],
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32]),
TypeSignature::Exact(vec![DataType::Date64]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Millisecond, None)]),
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]),
],
Volatility::Immutable,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ fn make_utc_timestamp_to_str_udf() -> ScalarUDF {

let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Date64, DataType::Utf8]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Utf8,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::task_graph::timezone::RuntimeTzConfig;
use datafusion_expr::{lit, Expr, ExprSchemable};
use datafusion_expr::{expr, lit, Expr, ExprSchemable};
use std::sync::Arc;
use vegafusion_common::arrow::datatypes::DataType;
use vegafusion_common::datafusion_common::{DFSchema, ScalarValue};
use vegafusion_common::datatypes::{cast_to, is_numeric_datatype};
use vegafusion_core::arrow::datatypes::TimeUnit;
use vegafusion_core::error::{Result, VegaFusionError};
use vegafusion_datafusion_udfs::udfs::datetime::epoch_to_utc_timestamp::EPOCH_MS_TO_UTC_TIMESTAMP_UDF;
use vegafusion_datafusion_udfs::udfs::datetime::format_timestamp::FORMAT_TIMESTAMP_UDF;
Expand Down Expand Up @@ -91,6 +92,14 @@ pub fn utc_format_fn(

fn to_timestamptz_expr(arg: &Expr, schema: &DFSchema, default_input_tz: &str) -> Result<Expr> {
Ok(match arg.get_type(schema)? {
DataType::Date32 => Expr::Cast(expr::Cast {
expr: Box::new(arg.clone()),
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
}),
DataType::Date64 => Expr::Cast(expr::Cast {
expr: Box::new(arg.clone()),
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
}),
DataType::Timestamp(_, _) => arg.clone(),
DataType::Utf8 => Expr::ScalarUDF {
fun: Arc::new((*STR_TO_UTC_TIMESTAMP_UDF).clone()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::expression::compiler::call::TzTransformFn;
use crate::task_graph::timezone::RuntimeTzConfig;
use datafusion_expr::{floor, lit, Expr, ExprSchemable};
use datafusion_expr::{expr, floor, lit, Expr, ExprSchemable};
use std::sync::Arc;
use vegafusion_common::arrow::datatypes::DataType;
use vegafusion_common::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_common::datafusion_common::DFSchema;
use vegafusion_common::datatypes::{cast_to, is_numeric_datatype};
use vegafusion_core::error::{Result, VegaFusionError};
Expand Down Expand Up @@ -64,6 +64,14 @@ fn extract_timestamp_arg(
) -> Result<Expr> {
if let Some(arg) = args.get(0) {
Ok(match arg.get_type(schema)? {
DataType::Date32 => Expr::Cast(expr::Cast {
expr: Box::new(arg.clone()),
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
}),
DataType::Date64 => Expr::Cast(expr::Cast {
expr: Box::new(arg.clone()),
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
}),
DataType::Timestamp(_, _) => arg.clone(),
DataType::Utf8 => Expr::ScalarUDF {
fun: Arc::new((*STR_TO_UTC_TIMESTAMP_UDF).clone()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn time_fn(tz_config: &RuntimeTzConfig, args: &[Expr], schema: &DFSchema) ->

// Dispatch handling on data type
let expr = match arg.get_type(schema)? {
DataType::Timestamp(_, _) => Expr::ScalarUDF {
DataType::Timestamp(_, _) | DataType::Date32 | DataType::Date64 => Expr::ScalarUDF {
fun: Arc::new((*UTC_TIMESTAMP_TO_EPOCH_MS).clone()),
args: vec![arg.clone()],
},
Expand Down
38 changes: 37 additions & 1 deletion vegafusion-sql/src/dialect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::dialect::transforms::utc_timestamp_to_str::{
UtcTimestampToStrDuckDBTransformer, UtcTimestampToStrPostgresTransformer,
UtcTimestampToStrSnowflakeTransformer,
};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use datafusion_expr::{lit, ExprSchemable};
Expand Down Expand Up @@ -329,6 +329,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Double),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::Varchar(None)),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -450,6 +454,10 @@ impl Dialect {
(DataType::Float32, float64dtype.clone()),
(DataType::Float64, float64dtype),
(DataType::Utf8, SqlDataType::String),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -544,6 +552,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Float(None)),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::Varchar(None)),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -681,6 +693,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Float(None)),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::String),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -820,6 +836,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Float(None)),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::String),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -969,6 +989,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Float(None)),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::Varchar(None)),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -1213,6 +1237,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Real),
(DataType::Float64, SqlDataType::DoublePrecision),
(DataType::Utf8, SqlDataType::Text),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -1339,6 +1367,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Real),
(DataType::Float64, SqlDataType::DoublePrecision),
(DataType::Utf8, SqlDataType::Text),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down Expand Up @@ -1475,6 +1507,10 @@ impl Dialect {
(DataType::Float32, SqlDataType::Float(None)),
(DataType::Float64, SqlDataType::Double),
(DataType::Utf8, SqlDataType::Varchar(None)),
(
DataType::Timestamp(TimeUnit::Millisecond, None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
),
]
.into_iter()
.collect(),
Expand Down

0 comments on commit 97b26f1

Please sign in to comment.