Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into stateless-row-enc…
Browse files Browse the repository at this point in the history
…oding
  • Loading branch information
tustvold committed Sep 17, 2023
2 parents b864c5e + d960379 commit b5529db
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 71 deletions.
29 changes: 12 additions & 17 deletions arrow-cast/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,11 @@ pub fn string_to_timestamp_nanos(s: &str) -> Result<i64, ArrowError> {
to_timestamp_nanos(string_to_datetime(&Utc, s)?.naive_utc())
}

/// Defensive check to prevent chrono-rs panics when nanosecond conversion happens on non-supported dates
/// Fallible conversion of [`NaiveDateTime`] to `i64` nanoseconds
#[inline]
fn to_timestamp_nanos(dt: NaiveDateTime) -> Result<i64, ArrowError> {
if dt.timestamp().checked_mul(1_000_000_000).is_none() {
return Err(ArrowError::ParseError(
ERR_NANOSECONDS_NOT_SUPPORTED.to_string(),
));
}

Ok(dt.timestamp_nanos())
dt.timestamp_nanos_opt()
.ok_or_else(|| ArrowError::ParseError(ERR_NANOSECONDS_NOT_SUPPORTED.to_string()))
}

/// Accepts a string in ISO8601 standard format and some
Expand Down Expand Up @@ -1313,12 +1308,12 @@ mod tests {

// Ensure both T and ' ' variants work
assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08T13:42:29.190855").unwrap()
);

assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08 13:42:29.190855").unwrap()
);

Expand All @@ -1331,12 +1326,12 @@ mod tests {

// Ensure both T and ' ' variants work
assert_eq!(
naive_datetime_whole_secs.timestamp_nanos(),
naive_datetime_whole_secs.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08T13:42:29").unwrap()
);

assert_eq!(
naive_datetime_whole_secs.timestamp_nanos(),
naive_datetime_whole_secs.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08 13:42:29").unwrap()
);

Expand All @@ -1349,7 +1344,7 @@ mod tests {
);

assert_eq!(
naive_datetime_no_time.timestamp_nanos(),
naive_datetime_no_time.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08").unwrap()
)
}
Expand Down Expand Up @@ -1463,12 +1458,12 @@ mod tests {

// Ensure both T and ' ' variants work
assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08T13:42:29.190855").unwrap()
);

assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08 13:42:29.190855").unwrap()
);

Expand All @@ -1479,12 +1474,12 @@ mod tests {

// Ensure both T and ' ' variants work
assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08T13:42:29").unwrap()
);

assert_eq!(
naive_datetime.timestamp_nanos(),
naive_datetime.timestamp_nanos_opt().unwrap(),
parse_timestamp("2020-09-08 13:42:29").unwrap()
);

Expand Down
34 changes: 20 additions & 14 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,20 +984,26 @@ fn build_timestamp_array_impl<T: ArrowTimestampType, Tz: TimeZone>(
return Ok(None);
}

let date = string_to_datetime(timezone, s).map_err(|e| {
ArrowError::ParseError(format!(
"Error parsing column {col_idx} at line {}: {}",
line_number + row_index,
e
))
})?;

Ok(Some(match T::UNIT {
TimeUnit::Second => date.timestamp(),
TimeUnit::Millisecond => date.timestamp_millis(),
TimeUnit::Microsecond => date.timestamp_micros(),
TimeUnit::Nanosecond => date.timestamp_nanos(),
}))
let date = string_to_datetime(timezone, s)
.and_then(|date| match T::UNIT {
TimeUnit::Second => Ok(date.timestamp()),
TimeUnit::Millisecond => Ok(date.timestamp_millis()),
TimeUnit::Microsecond => Ok(date.timestamp_micros()),
TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
ArrowError::ParseError(format!(
"{} would overflow 64-bit signed nanoseconds",
date.to_rfc3339(),
))
}),
})
.map_err(|e| {
ArrowError::ParseError(format!(
"Error parsing column {col_idx} at line {}: {}",
line_number + row_index,
e
))
})?;
Ok(Some(date))
})
.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ publish = false
[dependencies]
# Pin specific version of the tonic-build dependencies to avoid auto-generated
# (and checked in) arrow.flight.protocol.rs from changing
proc-macro2 = { version = "=1.0.66", default-features = false }
proc-macro2 = { version = "=1.0.67", default-features = false }
prost-build = { version = "=0.11.9", default-features = false }
tonic-build = { version = "=0.9.2", default-features = false, features = ["transport", "prost"] }
9 changes: 8 additions & 1 deletion arrow-json/src/reader/timestamp_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ where
TimeUnit::Second => date.timestamp(),
TimeUnit::Millisecond => date.timestamp_millis(),
TimeUnit::Microsecond => date.timestamp_micros(),
TimeUnit::Nanosecond => date.timestamp_nanos(),
TimeUnit::Nanosecond => {
date.timestamp_nanos_opt().ok_or_else(|| {
ArrowError::ParseError(format!(
"{} would overflow 64-bit signed nanoseconds",
date.to_rfc3339(),
))
})?
}
};
builder.append_value(value)
}
Expand Down
6 changes: 4 additions & 2 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,8 @@ mod tests {
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos();
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;
Expand Down Expand Up @@ -809,7 +810,8 @@ mod tests {
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos();
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;
Expand Down
72 changes: 36 additions & 36 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ fn concat_fallback(
Ok(make_array(mutable.freeze()))
}

/// Concatenates `batches` together into a single record batch.
/// Concatenates `batches` together into a single [`RecordBatch`].
///
/// The output batch has the specified `schemas`; The schema of the
/// input are ignored.
///
/// Returns an error if the types of underlying arrays are different.
pub fn concat_batches<'a>(
schema: &SchemaRef,
input_batches: impl IntoIterator<Item = &'a RecordBatch>,
Expand All @@ -176,20 +181,6 @@ pub fn concat_batches<'a>(
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
if let Some((i, _)) = batches
.iter()
.enumerate()
.find(|&(_, batch)| batch.schema() != *schema)
{
return Err(ArrowError::InvalidArgumentError(format!(
"batches[{i}] schema is different with argument schema.
batches[{i}] schema: {:?},
argument schema: {:?}
",
batches[i].schema(),
*schema
)));
}
let field_num = schema.fields().len();
let mut arrays = Vec::with_capacity(field_num);
for i in 0..field_num {
Expand Down Expand Up @@ -727,36 +718,45 @@ mod tests {
}

#[test]
fn concat_record_batches_of_different_schemas() {
let schema1 = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let schema2 = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Utf8, false),
]));
fn concat_record_batches_of_different_schemas_but_compatible_data() {
let schema1 =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
// column names differ
let schema2 =
Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)
.unwrap();
let batch2 =
RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))])
.unwrap();
// concat_batches simply uses the schema provided
let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
assert_eq!(batch.schema().as_ref(), schema1.as_ref());
assert_eq!(4, batch.num_rows());
}

#[test]
fn concat_record_batches_of_different_schemas_incompatible_data() {
let schema1 =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
// column names differ
let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
)
.unwrap();

let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: batches[1] schema is different with argument schema.\n batches[1] schema: Schema { fields: [Field { name: \"c\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"d\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n argument schema: Schema { fields: [Field { name: \"a\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"b\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n "
);
assert_eq!(error.to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types.");
}

#[test]
Expand Down

0 comments on commit b5529db

Please sign in to comment.