Skip to content

Commit

Permalink
Support timestamps and steps of less than a day for range/generate_se…
Browse files Browse the repository at this point in the history
…ries (#12400)

* Support timestamps and steps of less than a day for timestamps.

* Updated docs for range and generate_series to add additional info wrt timestamp support.

* Updates based on code review.

* Cleanup error message

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
Omega359 and alamb authored Sep 12, 2024
1 parent 5b6b404 commit 1f06308
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 30 deletions.
179 changes: 165 additions & 14 deletions datafusion/functions-nested/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,31 @@
//! [`ScalarUDFImpl`] definitions for range and gen_series functions.
use crate::utils::make_scalar_function;
use arrow::array::{Array, ArrayRef, Date32Builder, Int64Array, ListArray, ListBuilder};
use arrow::array::{Array, ArrayRef, Int64Array, ListArray, ListBuilder};
use arrow::datatypes::{DataType, Field};
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType};
use arrow_array::NullArray;
use arrow_array::builder::{Date32Builder, TimestampNanosecondBuilder};
use arrow_array::temporal_conversions::as_datetime_with_timezone;
use arrow_array::timezone::Tz;
use arrow_array::types::{
Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT,
};
use arrow_array::{NullArray, TimestampNanosecondArray};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
use arrow_schema::DataType::*;
use arrow_schema::IntervalUnit::MonthDayNano;
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array};
use datafusion_common::{exec_err, not_impl_datafusion_err, Result};
use arrow_schema::TimeUnit::Nanosecond;
use datafusion_common::cast::{
as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array,
};
use datafusion_common::{
exec_datafusion_err, exec_err, internal_err, not_impl_datafusion_err, Result,
};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use itertools::Itertools;
use std::any::Any;
use std::cmp::Ordering;
use std::iter::from_fn;
use std::str::FromStr;
use std::sync::Arc;

make_udf_expr_and_func!(
Expand Down Expand Up @@ -78,7 +90,7 @@ impl ScalarUDFImpl for Range {
UInt16 => Ok(Int64),
UInt32 => Ok(Int64),
UInt64 => Ok(Int64),
Timestamp(_, _) => Ok(Date32),
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
Date32 => Ok(Date32),
Date64 => Ok(Date32),
Utf8 => Ok(Date32),
Expand Down Expand Up @@ -109,8 +121,11 @@ impl ScalarUDFImpl for Range {
match args[0].data_type() {
Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args),
Date32 => make_scalar_function(|args| gen_range_date(args, false))(args),
_ => {
exec_err!("unsupported type for range")
Timestamp(_, _) => {
make_scalar_function(|args| gen_range_timestamp(args, false))(args)
}
dt => {
exec_err!("unsupported type for RANGE. Expected Int64, Date32 or Timestamp, got: {dt}")
}
}
}
Expand Down Expand Up @@ -152,8 +167,8 @@ impl ScalarUDFImpl for GenSeries {
&self.signature
}

fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
_arg_types
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
arg_types
.iter()
.map(|arg_type| match arg_type {
Null => Ok(Null),
Expand All @@ -165,7 +180,7 @@ impl ScalarUDFImpl for GenSeries {
UInt16 => Ok(Int64),
UInt32 => Ok(Int64),
UInt64 => Ok(Int64),
Timestamp(_, _) => Ok(Date32),
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
Date32 => Ok(Date32),
Date64 => Ok(Date32),
Utf8 => Ok(Date32),
Expand Down Expand Up @@ -196,9 +211,12 @@ impl ScalarUDFImpl for GenSeries {
match args[0].data_type() {
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
Timestamp(_, _) => {
make_scalar_function(|args| gen_range_timestamp(args, true))(args)
}
dt => {
exec_err!(
"unsupported type for gen_series. Expected Int64 or Date32, got: {}",
"unsupported type for GENERATE_SERIES. Expected Int64, Date32 or Timestamp, got: {}",
dt
)
}
Expand Down Expand Up @@ -334,7 +352,7 @@ fn gen_range_iter(
}
}

fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
fn gen_range_date(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
if args.len() != 3 {
return exec_err!("arguments length does not match");
}
Expand Down Expand Up @@ -372,7 +390,7 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
}

let neg = months < 0 || days < 0;
if !include_upper {
if !include_upper_bound {
stop = Date32Type::subtract_month_day_nano(stop, step);
}
let mut new_date = start;
Expand All @@ -394,3 +412,136 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {

Ok(arr)
}

fn gen_range_timestamp(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
if args.len() != 3 {
return exec_err!(
"Arguments length must be 3 for {}",
if include_upper_bound {
"GENERATE_SERIES"
} else {
"RANGE"
}
);
}

// coerce_types fn should coerce all types to Timestamp(Nanosecond, tz)
let (start_arr, start_tz_opt) = cast_timestamp_arg(&args[0], include_upper_bound)?;
let (stop_arr, stop_tz_opt) = cast_timestamp_arg(&args[1], include_upper_bound)?;
let step_arr = as_interval_mdn_array(&args[2])?;
let start_tz = parse_tz(start_tz_opt)?;
let stop_tz = parse_tz(stop_tz_opt)?;

// values are timestamps
let values_builder = start_tz_opt
.clone()
.map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
});
let mut list_builder = ListBuilder::new(values_builder);

for idx in 0..start_arr.len() {
if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) {
list_builder.append_null();
continue;
}

let start = start_arr.value(idx);
let stop = stop_arr.value(idx);
let step = step_arr.value(idx);

let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
if months == 0 && days == 0 && ns == 0 {
return exec_err!(
"Interval argument to {} must not be 0",
if include_upper_bound {
"GENERATE_SERIES"
} else {
"RANGE"
}
);
}

let neg = TSNT::add_month_day_nano(start, step, start_tz)
.ok_or(exec_datafusion_err!(
"Cannot generate timestamp range where start + step overflows"
))?
.cmp(&start)
== Ordering::Less;

let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or(
exec_datafusion_err!(
"Cannot generate timestamp for stop: {}: {:?}",
stop,
stop_tz
),
)?;

let mut current = start;
let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or(
exec_datafusion_err!(
"Cannot generate timestamp for start: {}: {:?}",
current,
start_tz
),
)?;

let values = from_fn(|| {
if (include_upper_bound
&& ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt)))
|| (!include_upper_bound
&& ((neg && current_dt <= stop_dt)
|| (!neg && current_dt >= stop_dt)))
{
return None;
}

let prev_current = current;

if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) {
current = ts;
current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?;

Some(Some(prev_current))
} else {
// we failed to parse the timestamp here so terminate the series
None
}
});

list_builder.append_value(values);
}

let arr = Arc::new(list_builder.finish());

Ok(arr)
}

fn cast_timestamp_arg(
arg: &ArrayRef,
include_upper: bool,
) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> {
match arg.data_type() {
Timestamp(Nanosecond, tz_opt) => {
Ok((as_timestamp_nanosecond_array(arg)?, tz_opt))
}
_ => {
internal_err!(
"Unexpected argument type for {} : {}",
if include_upper {
"GENERATE_SERIES"
} else {
"RANGE"
},
arg.data_type()
)
}
}
}

fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> {
let tz = tz.as_ref().map_or_else(|| "+00", |s| s);

Tz::from_str(tz)
.map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
}
Loading

0 comments on commit 1f06308

Please sign in to comment.