diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/date_to_utc_timestamp.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/date_to_utc_timestamp.rs index ff00ecb9..28b98a3d 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/date_to_utc_timestamp.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/date_to_utc_timestamp.rs @@ -2,11 +2,12 @@ use chrono::{NaiveDateTime, TimeZone}; use std::any::Any; use std::str::FromStr; use std::sync::Arc; +use vegafusion_common::arrow::compute::try_unary; +use vegafusion_common::arrow::error::ArrowError; use vegafusion_common::datafusion_expr::ScalarUDFImpl; use vegafusion_common::{ arrow::{ array::{ArrayRef, Date32Array, TimestampMillisecondArray}, - compute::unary, datatypes::{DataType, TimeUnit}, }, datafusion_common::{DataFusionError, ScalarValue}, @@ -73,7 +74,7 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF { let s_per_day = 60 * 60 * 24_i64; let date_array = date_array.as_any().downcast_ref::().unwrap(); - let timestamp_array: TimestampMillisecondArray = unary(date_array, |v| { + let timestamp_array: TimestampMillisecondArray = try_unary(date_array, |v| { // Build naive datetime for time let seconds = (v as i64) * s_per_day; let nanoseconds = 0_u32; @@ -84,11 +85,11 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF { let local_datetime = tz .from_local_datetime(&naive_local_datetime) .earliest() - .unwrap(); + .ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?; // Get timestamp millis (in UTC) - local_datetime.timestamp_millis() - }); + Ok(local_datetime.timestamp_millis()) + })?; let timestamp_array = Arc::new(timestamp_array) as ArrayRef; // maybe back to scalar diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs index bb8d633e..be2790d2 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/str_to_utc_timestamp.rs @@ -80,7 +80,7 @@ pub fn parse_datetime( dt } else { // Handle positive timezone transition by adding 1 hour - let datetime = datetime.with_hour(datetime.hour() + 1).unwrap(); + let datetime = datetime.with_hour(datetime.hour() + 1)?; local_tz.from_local_datetime(&datetime).earliest()? }; let dt_utc = dt.with_timezone(&chrono::Utc); diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs index 517d83ad..1a01a182 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs @@ -4,8 +4,9 @@ use std::any::Any; use std::str::FromStr; use std::sync::Arc; use vegafusion_common::arrow::array::{ArrayRef, Int64Array, TimestampMillisecondArray}; -use vegafusion_common::arrow::compute::unary; +use vegafusion_common::arrow::compute::try_unary; use vegafusion_common::arrow::datatypes::{DataType, TimeUnit}; +use vegafusion_common::arrow::error::ArrowError; use vegafusion_common::arrow::temporal_conversions::date64_to_datetime; use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue}; use vegafusion_common::datafusion_expr::{ @@ -66,36 +67,52 @@ fn perform_timeunit_start_from_utc( value: i64, units_mask: &[bool], in_tz: T, -) -> DateTime { +) -> Result, ArrowError> { // Load and interpret date time as UTC let dt_value = date64_to_datetime(value) - .unwrap() - .with_nanosecond(0) - .unwrap(); - let dt_value = Utc.from_local_datetime(&dt_value).earliest().unwrap(); + .and_then(|d| d.with_nanosecond(0)) + .ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?; + + let dt_value = + Utc.from_local_datetime(&dt_value) + .earliest() + .ok_or(ArrowError::ComputeError( + "Failed to convert to UTC".to_string(), + ))?; + let mut dt_value = dt_value.with_timezone(&in_tz); // Handle time truncation if !units_mask[7] { // Clear hours first to avoid any of the other time truncations from landing on a daylight // savings boundary - dt_value = dt_value.with_hour(0).unwrap(); + dt_value = dt_value + .with_hour(0) + .ok_or(ArrowError::ComputeError("Failed to drop hours".to_string()))?; } if !units_mask[10] { // Milliseconds let new_ns = (((dt_value.nanosecond() as f64) / 1e6).floor() * 1e6) as u32; - dt_value = dt_value.with_nanosecond(new_ns).unwrap(); + dt_value = dt_value + .with_nanosecond(new_ns) + .ok_or(ArrowError::ComputeError( + "Failed to set nanoseconds".to_string(), + ))?; } if !units_mask[9] { // Seconds - dt_value = dt_value.with_second(0).unwrap(); + dt_value = dt_value.with_second(0).ok_or(ArrowError::ComputeError( + "Failed to set seconds".to_string(), + ))?; } if !units_mask[8] { // Minutes - dt_value = dt_value.with_minute(0).unwrap(); + dt_value = dt_value.with_minute(0).ok_or(ArrowError::ComputeError( + "Failed to set minutes".to_string(), + ))?; } // Save off day of the year and weekday here, because these will change if the @@ -115,11 +132,11 @@ fn perform_timeunit_start_from_utc( let hour = dt_value.hour(); dt_value .with_hour(0) - .unwrap() - .with_year(2012) - .unwrap() - .with_hour(hour + 1) - .unwrap() + .and_then(|dt| dt.with_year(2012)) + .and_then(|dt| dt.with_hour(hour + 1)) + .ok_or(ArrowError::ComputeError( + "Failed to handle daylight savings boundary".to_string(), + ))? } } @@ -131,35 +148,42 @@ fn perform_timeunit_start_from_utc( let new_month = ((dt_value.month0() as f64 / 3.0).floor() * 3.0) as u32; dt_value = dt_value .with_day0(0) - .unwrap() - .with_month0(new_month) - .unwrap(); + .and_then(|dt| dt.with_month0(new_month)) + .ok_or(ArrowError::ComputeError( + "Failed to truncate to quarter".to_string(), + ))?; } else if units_mask[2] { // Month and not Date // Truncate to first day of the month if !units_mask[3] { - dt_value = dt_value.with_day0(0).unwrap(); + dt_value = dt_value.with_day0(0).ok_or(ArrowError::ComputeError( + "Failed to truncate to first day of the month".to_string(), + ))?; } } else if units_mask[3] { // Date and not Month // Normalize to January, keeping existing day of the month. // (January has 31 days, so this is safe) if !units_mask[2] { - dt_value = dt_value.with_month0(0).unwrap(); + dt_value = dt_value.with_month0(0).ok_or(ArrowError::ComputeError( + "Failed to truncate to first day of the month".to_string(), + ))?; } } else if units_mask[4] { // Week // Step 1: Find the date of the first Sunday in the same calendar year as the date. // This may occur in isoweek 0, or in the final isoweek of the previous year - - let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun) - .expect("invalid or out-of-range datetime"); + let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun).ok_or( + ArrowError::ComputeError("invalid or out-of-range datetime".to_string()), + )?; let isoweek0_sunday = NaiveDateTime::new(isoweek0_sunday, dt_value.time()); let isoweek0_sunday = in_tz .from_local_datetime(&isoweek0_sunday) .earliest() - .unwrap(); + .ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; // Subtract one week from isoweek0_sunday and check if it's still in the same calendar // year @@ -188,11 +212,15 @@ fn perform_timeunit_start_from_utc( // (which is January 1st) let first_sunday_of_2012 = in_tz .from_local_datetime(&NaiveDateTime::new( - NaiveDate::from_ymd_opt(2012, 1, 1).expect("invalid or out-of-range datetime"), + NaiveDate::from_ymd_opt(2012, 1, 1).ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?, dt_value.time(), )) .earliest() - .unwrap(); + .ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; dt_value = first_sunday_of_2012 + chrono::Duration::weeks(week_number); } else { @@ -207,20 +235,34 @@ fn perform_timeunit_start_from_utc( } else { NaiveDate::from_isoywd_opt(dt_value.year(), 2, weekday) } - .expect("invalid or out-of-range datetime"); + .ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; let new_datetime = NaiveDateTime::new(new_date, dt_value.time()); - dt_value = in_tz.from_local_datetime(&new_datetime).earliest().unwrap(); + dt_value = + in_tz + .from_local_datetime(&new_datetime) + .earliest() + .ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; } else if units_mask[6] { // DayOfYear // Keep the same day of the year - dt_value = dt_value.with_ordinal0(ordinal0).unwrap(); + dt_value = dt_value + .with_ordinal0(ordinal0) + .ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; } else { // Clear month and date - dt_value = dt_value.with_ordinal0(0).unwrap(); + dt_value = dt_value.with_ordinal0(0).ok_or(ArrowError::ComputeError( + "invalid or out-of-range datetime".to_string(), + ))?; } - dt_value + Ok(dt_value) } #[derive(Debug, Clone)] @@ -296,9 +338,12 @@ impl ScalarUDFImpl for TimeunitStartUDF { let (timestamp, tz, units_mask) = unpack_timeunit_udf_args(args)?; let array = timestamp.as_any().downcast_ref::().unwrap(); - let result_array: TimestampMillisecondArray = unary(array, |value| { - perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz).timestamp_millis() - }); + let result_array: TimestampMillisecondArray = try_unary(array, |value| { + Ok( + perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz)? + .timestamp_millis(), + ) + })?; Ok(ColumnarValue::Array(Arc::new(result_array) as ArrayRef)) } diff --git a/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs b/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs index de4c8e8f..def4af37 100644 --- a/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs +++ b/vegafusion-datafusion-udfs/src/udfs/datetime/to_utc_timestamp.rs @@ -117,8 +117,7 @@ pub fn to_utc_timestamp(timestamp_array: ArrayRef, tz: Tz) -> Result