diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 659e2c0ee3a7..ab4292e0e0eb 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -66,6 +66,7 @@ tokio = { version = "1.0", optional = true, default-features = false, features = hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } +half = { version = "2.1", default-features = false } [dev-dependencies] base64 = { version = "0.21", default-features = false, features = ["std"] } diff --git a/parquet/regen.sh b/parquet/regen.sh index b8c3549e2324..91539634339d 100755 --- a/parquet/regen.sh +++ b/parquet/regen.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -REVISION=aeae80660c1d0c97314e9da837de1abdebd49c37 +REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2 SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)" diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index b06091b6b57a..072130833903 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -28,12 +28,13 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::memory::ByteBufferPtr; use arrow_array::{ - ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, + ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray, }; use arrow_buffer::{i256, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, IntervalUnit}; +use half::f16; use std::any::Any; use std::ops::Range; use std::sync::Arc; @@ -88,6 +89,14 @@ pub fn make_fixed_len_byte_array_reader( )); } } + ArrowType::Float16 => { + if byte_length != 2 { + return Err(general_err!( + "float 16 type must be 2 bytes, got {}", + byte_length + )); + } + } _ => { return Err(general_err!( "invalid data type for fixed length byte array reader - {}", @@ -153,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader { fn consume_batch(&mut self) -> Result { let record_data = self.record_reader.consume_record_data(); - let array_data = - ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) - .len(self.record_reader.num_values()) - .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); + let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) + .len(self.record_reader.num_values()) + .add_buffer(record_data) + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); @@ -188,19 +196,13 @@ impl ArrayReader for FixedLenByteArrayReader { IntervalUnit::YearMonth => Arc::new( binary .iter() - .map(|o| { - o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())) - }) + .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::DayTime => Arc::new( binary .iter() - .map(|o| { - o.map(|b| { - i64::from_le_bytes(b[4..12].try_into().unwrap()) - }) - }) + .map(|o| o.map(|b| i64::from_le_bytes(b[4..12].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::MonthDayNano => { @@ -208,6 +210,12 @@ impl ArrayReader for FixedLenByteArrayReader { } } } + ArrowType::Float16 => Arc::new( + binary + .iter() + .map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap()))) + .collect::(), + ) as ArrayRef, _ => Arc::new(binary) as ArrayRef, }; @@ -278,9 +286,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let slice = self.buffer.as_slice_mut(); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in - values_range.rev().zip(iter_set_bits_rev(valid_mask)) - { + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; @@ -376,8 +382,7 @@ impl ColumnValueDecoder for ValueDecoder { let len = range.end - range.start; match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { - let to_read = - (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; + let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; let end_offset = *offset + to_read * self.byte_length; out.buffer .extend_from_slice(&buf.as_ref()[*offset..end_offset]); @@ -470,15 +475,12 @@ mod tests { .build() .unwrap(); - let written = RecordBatch::try_from_iter([( - "list", - Arc::new(ListArray::from(data)) as ArrayRef, - )]) - .unwrap(); + let written = + RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)]) + .unwrap(); let mut buffer = Vec::with_capacity(1024); - let mut writer = - ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); + let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); writer.write(&written).unwrap(); writer.close().unwrap(); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 16cdf2934e6f..9ed63086c651 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -712,6 +712,7 @@ mod tests { use std::sync::Arc; use bytes::Bytes; + use half::f16; use num::PrimInt; use rand::{thread_rng, Rng, RngCore}; use tempfile::tempfile; @@ -924,6 +925,53 @@ mod tests { .unwrap(); } + #[test] + fn test_float16_roundtrip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "float16", + ArrowDataType::Float16, + true, + )])); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?; + + let original = RecordBatch::try_new( + schema, + vec![Arc::new(Float16Array::from_iter_values([ + f16::EPSILON, + f16::INFINITY, + f16::MIN, + f16::MAX, + f16::NAN, + f16::INFINITY, + f16::NEG_INFINITY, + f16::ONE, + f16::NEG_ONE, + f16::ZERO, + f16::NEG_ZERO, + f16::E, + f16::PI, + f16::FRAC_1_PI, + ]))], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + assert_eq!(ret, original); + + // Ensure can be downcast to the correct type + ret.column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + Ok(()) + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a9cd1afb2479..df218f354d98 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -771,6 +771,13 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + get_float_16_array_slice(array, indices) + } _ => { return Err(ParquetError::NYI( "Attempting to write an Arrow type that is not yet implemented".to_string(), @@ -867,6 +874,18 @@ fn get_decimal_256_array_slice( values } +fn get_float_16_array_slice( + array: &arrow_array::Float16Array, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let value = array.value(*i).to_le_bytes().to_vec(); + values.push(FixedLenByteArray::from(ByteArray::from(value))); + } + values +} + fn get_fsb_array_slice( array: &arrow_array::FixedSizeBinaryArray, indices: &[usize], diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index d56cc42d4313..1b44c0123089 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -32,8 +32,7 @@ use arrow_ipc::writer; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ - ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, - Type as PhysicalType, + ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, }; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; @@ -55,11 +54,7 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - parquet_to_arrow_schema_by_columns( - parquet_schema, - ProjectionMask::all(), - key_value_metadata, - ) + parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata) } /// Convert parquet schema to arrow schema including optional metadata, @@ -199,10 +194,7 @@ fn encode_arrow_schema(schema: &Schema) -> String { /// Mutates writer metadata by storing the encoded Arrow schema. /// If there is an existing Arrow schema metadata, it is replaced. -pub(crate) fn add_encoded_arrow_schema_to_metadata( - schema: &Schema, - props: &mut WriterProperties, -) { +pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -270,16 +262,15 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { let field = complex::convert_type(&parquet_column.self_type_ptr())?; - let mut ret = Field::new( - parquet_column.name(), - field.arrow_type, - field.nullable, - ); + let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable); let basic_info = parquet_column.self_type().get_basic_info(); if basic_info.has_id() { let mut meta = HashMap::with_capacity(1); - meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), basic_info.id().to_string()); + meta.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + basic_info.id().to_string(), + ); ret.set_metadata(meta); } @@ -373,7 +364,12 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")), + DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .with_logical_type(Some(LogicalType::Float16)) + .with_length(2) + .build(), DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT) .with_repetition(repetition) .with_id(id) @@ -396,15 +392,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => { - ParquetTimeUnit::MILLIS(Default::default()) - } - TimeUnit::Microsecond => { - ParquetTimeUnit::MICROS(Default::default()) - } - TimeUnit::Nanosecond => { - ParquetTimeUnit::NANOS(Default::default()) - } + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), }, })) .with_repetition(repetition) @@ -452,9 +442,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Duration(_) => { - Err(arrow_err!("Converting Duration to parquet not supported",)) - } + DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)), DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -476,8 +464,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::Decimal128(precision, scale) - | DataType::Decimal256(precision, scale) => { + DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal let (physical_type, length) = if *precision > 1 && *precision <= 9 { @@ -524,9 +511,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } DataType::Struct(fields) => { if fields.is_empty() { - return Err( - arrow_err!("Parquet does not support writing empty structs",), - ); + return Err(arrow_err!("Parquet does not support writing empty structs",)); } // recursively convert children to types/nodes let fields = fields @@ -604,9 +589,10 @@ mod tests { REQUIRED INT32 uint8 (INTEGER(8,false)); REQUIRED INT32 uint16 (INTEGER(16,false)); REQUIRED INT32 int32; - REQUIRED INT64 int64 ; + REQUIRED INT64 int64; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16); OPTIONAL BINARY string (UTF8); OPTIONAL BINARY string_2 (STRING); OPTIONAL BINARY json (JSON); @@ -615,8 +601,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -628,6 +613,7 @@ mod tests { Field::new("int64", DataType::Int64, false), Field::new("double", DataType::Float64, true), Field::new("float", DataType::Float32, true), + Field::new("float16", DataType::Float16, true), Field::new("string", DataType::Utf8, true), Field::new("string_2", DataType::Utf8, true), Field::new("json", DataType::Utf8, true), @@ -653,8 +639,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("decimal1", DataType::Decimal128(4, 2), false), @@ -680,8 +665,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("binary", DataType::Binary, false), @@ -702,8 +686,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -711,12 +694,9 @@ mod tests { ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = parquet_to_arrow_schema_by_columns( - &parquet_schema, - ProjectionMask::all(), - None, - ) - .unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -914,8 +894,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -993,8 +972,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1088,8 +1066,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1106,8 +1083,7 @@ mod tests { Field::new("leaf1", DataType::Boolean, false), Field::new("leaf2", DataType::Int32, false), ]); - let group1_struct = - Field::new("group1", DataType::Struct(group1_fields), false); + let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false); arrow_fields.push(group1_struct); let leaf3_field = Field::new("leaf3", DataType::Int64, false); @@ -1126,8 +1102,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1280,8 +1255,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1303,6 +1277,7 @@ mod tests { REQUIRED INT64 int64; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16); OPTIONAL BINARY string (UTF8); REPEATED BOOLEAN bools; OPTIONAL INT32 date (DATE); @@ -1339,6 +1314,7 @@ mod tests { Field::new("int64", DataType::Int64, false), Field::new("double", DataType::Float64, true), Field::new("float", DataType::Float32, true), + Field::new("float16", DataType::Float16, true), Field::new("string", DataType::Utf8, true), Field::new_list( "bools", @@ -1398,6 +1374,7 @@ mod tests { REQUIRED INT64 int64; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16); OPTIONAL BINARY string (STRING); OPTIONAL GROUP bools (LIST) { REPEATED GROUP list { @@ -1448,6 +1425,7 @@ mod tests { Field::new("int64", DataType::Int64, false), Field::new("double", DataType::Float64, true), Field::new("float", DataType::Float32, true), + Field::new("float16", DataType::Float16, true), Field::new("string", DataType::Utf8, true), Field::new_list( "bools", @@ -1502,20 +1480,11 @@ mod tests { vec![ Field::new("bools", DataType::Boolean, false), Field::new("uint32", DataType::UInt32, false), - Field::new_list( - "int32", - Field::new("element", DataType::Int32, true), - false, - ), + Field::new_list("int32", Field::new("element", DataType::Int32, true), false), ], false, ), - Field::new_dictionary( - "dictionary_strings", - DataType::Int32, - DataType::Utf8, - false, - ), + Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false), Field::new("decimal_int32", DataType::Decimal128(8, 2), false), Field::new("decimal_int64", DataType::Decimal128(16, 2), false), Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false), @@ -1600,10 +1569,8 @@ mod tests { let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ - ("Key", "Foo"), - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, false) + .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1621,10 +1588,7 @@ mod tests { ), Field::new( "c17", - DataType::Timestamp( - TimeUnit::Microsecond, - Some("Africa/Johannesburg".into()), - ), + DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())), false, ), Field::new( @@ -1636,10 +1600,8 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ - ("Key", "Bar"), - (PARQUET_FIELD_ID_META_KEY, "5"), - ])), + Field::new("item", DataType::Boolean, true) + .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])), false, ) .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), @@ -1661,6 +1623,8 @@ mod tests { vec![ Field::new("a", DataType::Int16, true), Field::new("b", DataType::Float64, false), + Field::new("c", DataType::Float32, false), + Field::new("d", DataType::Float16, false), ] .into(), ), @@ -1687,10 +1651,7 @@ mod tests { // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), Field::new_dict( "c31", - DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Utf8), - ), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, 123, true, @@ -1725,11 +1686,7 @@ mod tests { "c39", "key_value", Field::new("key", DataType::Utf8, false), - Field::new_list( - "value", - Field::new("element", DataType::Utf8, true), - true, - ), + Field::new_list("value", Field::new("element", DataType::Utf8, true), true), false, // fails to roundtrip keys_sorted true, ), @@ -1768,11 +1725,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back @@ -1831,33 +1785,23 @@ mod tests { }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "1"), - ])), - Field::new("c2", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])), + Field::new("c2", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])), ], HashMap::new(), ); - let writer = ArrowWriter::try_new( - vec![], - Arc::new(schema.clone()), - None, - )?; + let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?; let parquet_bytes = writer.into_inner()?; - let reader = crate::file::reader::SerializedFileReader::new( - bytes::Bytes::from(parquet_bytes), - )?; + let reader = + crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?; let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr(); // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema - let arrow_schema = crate::arrow::parquet_to_arrow_schema( - &schema_descriptor, - None, - )?; + let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); @@ -1870,19 +1814,14 @@ mod tests { #[test] fn test_arrow_schema_roundtrip_lists() -> Result<()> { - let metadata: HashMap = - [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let metadata: HashMap = [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new_list( - "c21", - Field::new("array", DataType::Boolean, true), - false, - ), + Field::new_list("c21", Field::new("array", DataType::Boolean, true), false), Field::new( "c22", DataType::FixedSizeList( @@ -1913,11 +1852,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 7d8b6a04ee81..447fe5fc3ab4 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::basic::{ - ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType, -}; +use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType}; use crate::errors::{ParquetError, Result}; use crate::schema::types::{BasicTypeInfo, Type}; use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION}; @@ -158,9 +156,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::UInt32), _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)), }, - (Some(LogicalType::Decimal { scale, precision }), _) => { - decimal_128_type(scale, precision) - } + (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision), (Some(LogicalType::Date), _) => Ok(DataType::Date32), (Some(LogicalType::Time { unit, .. }), _) => match unit { ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), @@ -237,9 +233,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result { - decimal_128_type(scale, precision) - } + (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision), (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision), (logical, converted) => Err(arrow_err!( "Unable to convert parquet INT64 logical type {:?} or converted type {}", @@ -304,6 +298,16 @@ fn from_fixed_len_byte_array( // would be incorrect if all 12 bytes of the interval are populated Ok(DataType::Interval(IntervalUnit::DayTime)) } + (Some(LogicalType::Float16), _) => { + if type_length == 2 { + Ok(DataType::Float16) + } else { + Err(ParquetError::General( + "FLOAT16 logical type must be Fixed Length Byte Array with length 2" + .to_string(), + )) + } + } _ => Ok(DataType::FixedSizeBinary(type_length)), } } diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index ab71aa44169b..4c9cb018d4e5 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -194,6 +194,7 @@ pub enum LogicalType { Json, Bson, Uuid, + Float16, } // ---------------------------------------------------------------------- @@ -478,6 +479,7 @@ impl ColumnOrder { LogicalType::Timestamp { .. } => SortOrder::SIGNED, LogicalType::Unknown => SortOrder::UNDEFINED, LogicalType::Uuid => SortOrder::UNSIGNED, + LogicalType::Float16 => SortOrder::SIGNED, }, // Fall back to converted type None => Self::get_converted_sort_order(converted_type, physical_type), @@ -739,6 +741,7 @@ impl From for LogicalType { parquet::LogicalType::JSON(_) => LogicalType::Json, parquet::LogicalType::BSON(_) => LogicalType::Bson, parquet::LogicalType::UUID(_) => LogicalType::Uuid, + parquet::LogicalType::FLOAT16(_) => LogicalType::Float16, } } } @@ -779,6 +782,7 @@ impl From for parquet::LogicalType { LogicalType::Json => parquet::LogicalType::JSON(Default::default()), LogicalType::Bson => parquet::LogicalType::BSON(Default::default()), LogicalType::Uuid => parquet::LogicalType::UUID(Default::default()), + LogicalType::Float16 => parquet::LogicalType::FLOAT16(Default::default()), } } } @@ -826,10 +830,11 @@ impl From> for ConvertedType { (64, false) => ConvertedType::UINT_64, t => panic!("Integer type {t:?} is not supported"), }, - LogicalType::Unknown => ConvertedType::NONE, LogicalType::Json => ConvertedType::JSON, LogicalType::Bson => ConvertedType::BSON, - LogicalType::Uuid => ConvertedType::NONE, + LogicalType::Uuid | LogicalType::Float16 | LogicalType::Unknown => { + ConvertedType::NONE + } }, None => ConvertedType::NONE, } @@ -1075,6 +1080,7 @@ impl str::FromStr for LogicalType { "INTERVAL" => Err(general_err!( "Interval parquet logical type not yet supported" )), + "FLOAT16" => Ok(LogicalType::Float16), other => Err(general_err!("Invalid parquet logical type {}", other)), } } @@ -1719,6 +1725,10 @@ mod tests { ConvertedType::from(Some(LogicalType::Enum)), ConvertedType::ENUM ); + assert_eq!( + ConvertedType::from(Some(LogicalType::Float16)), + ConvertedType::NONE + ); assert_eq!( ConvertedType::from(Some(LogicalType::Unknown)), ConvertedType::NONE @@ -2092,6 +2102,7 @@ mod tests { is_adjusted_to_u_t_c: true, unit: TimeUnit::NANOS(Default::default()), }, + LogicalType::Float16, ]; check_sort_order(signed, SortOrder::SIGNED); diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index b36e37a80c97..345fe7dd2615 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -243,6 +243,8 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option { distinct_count: stats.distinct_count().map(|value| value as i64), max_value: None, min_value: None, + is_max_value_exact: None, + is_min_value_exact: None, }; // Get min/max if set. @@ -607,6 +609,8 @@ mod tests { distinct_count: None, max_value: None, min_value: None, + is_max_value_exact: None, + is_min_value_exact: None, }; from_thrift(Type::INT32, Some(thrift_stats)).unwrap(); diff --git a/parquet/src/format.rs b/parquet/src/format.rs index 46adc39e6406..4700b05dc282 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -657,16 +657,26 @@ pub struct Statistics { pub null_count: Option, /// count of distinct values occurring pub distinct_count: Option, - /// Min and max values for the column, determined by its ColumnOrder. + /// Lower and upper bound values for the column, determined by its ColumnOrder. + /// + /// These may be the actual minimum and maximum values found on a page or column + /// chunk, but can also be (more compact) values that do not exist on a page or + /// column chunk. For example, instead of storing "Blart Versenwald III", a writer + /// may set min_value="B", max_value="C". Such more compact values must still be + /// valid values within the column's logical type. /// /// Values are encoded using PLAIN encoding, except that variable-length byte /// arrays do not include a length prefix. pub max_value: Option>, pub min_value: Option>, + /// If true, max_value is the actual maximum value for a column + pub is_max_value_exact: Option, + /// If true, min_value is the actual minimum value for a column + pub is_min_value_exact: Option, } impl Statistics { - pub fn new(max: F1, min: F2, null_count: F3, distinct_count: F4, max_value: F5, min_value: F6) -> Statistics where F1: Into>>, F2: Into>>, F3: Into>, F4: Into>, F5: Into>>, F6: Into>> { + pub fn new(max: F1, min: F2, null_count: F3, distinct_count: F4, max_value: F5, min_value: F6, is_max_value_exact: F7, is_min_value_exact: F8) -> Statistics where F1: Into>>, F2: Into>>, F3: Into>, F4: Into>, F5: Into>>, F6: Into>>, F7: Into>, F8: Into> { Statistics { max: max.into(), min: min.into(), @@ -674,6 +684,8 @@ impl Statistics { distinct_count: distinct_count.into(), max_value: max_value.into(), min_value: min_value.into(), + is_max_value_exact: is_max_value_exact.into(), + is_min_value_exact: is_min_value_exact.into(), } } } @@ -687,6 +699,8 @@ impl crate::thrift::TSerializable for Statistics { let mut f_4: Option = None; let mut f_5: Option> = None; let mut f_6: Option> = None; + let mut f_7: Option = None; + let mut f_8: Option = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -718,6 +732,14 @@ impl crate::thrift::TSerializable for Statistics { let val = i_prot.read_bytes()?; f_6 = Some(val); }, + 7 => { + let val = i_prot.read_bool()?; + f_7 = Some(val); + }, + 8 => { + let val = i_prot.read_bool()?; + f_8 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -732,6 +754,8 @@ impl crate::thrift::TSerializable for Statistics { distinct_count: f_4, max_value: f_5, min_value: f_6, + is_max_value_exact: f_7, + is_min_value_exact: f_8, }; Ok(ret) } @@ -768,6 +792,16 @@ impl crate::thrift::TSerializable for Statistics { o_prot.write_bytes(fld_var)?; o_prot.write_field_end()? } + if let Some(fld_var) = self.is_max_value_exact { + o_prot.write_field_begin(&TFieldIdentifier::new("is_max_value_exact", TType::Bool, 7))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()? + } + if let Some(fld_var) = self.is_min_value_exact { + o_prot.write_field_begin(&TFieldIdentifier::new("is_min_value_exact", TType::Bool, 8))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -996,6 +1030,43 @@ impl crate::thrift::TSerializable for DateType { } } +// +// Float16Type +// + +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Float16Type { +} + +impl Float16Type { + pub fn new() -> Float16Type { + Float16Type {} + } +} + +impl crate::thrift::TSerializable for Float16Type { + fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + i_prot.skip(field_ident.field_type)?; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = Float16Type {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Float16Type"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + // // NullType // @@ -1640,6 +1711,7 @@ pub enum LogicalType { JSON(JsonType), BSON(BsonType), UUID(UUIDType), + FLOAT16(Float16Type), } impl crate::thrift::TSerializable for LogicalType { @@ -1745,6 +1817,13 @@ impl crate::thrift::TSerializable for LogicalType { } received_field_count += 1; }, + 15 => { + let val = Float16Type::read_from_in_protocol(i_prot)?; + if ret.is_none() { + ret = Some(LogicalType::FLOAT16(val)); + } + received_field_count += 1; + }, _ => { i_prot.skip(field_ident.field_type)?; received_field_count += 1; @@ -1844,6 +1923,11 @@ impl crate::thrift::TSerializable for LogicalType { f.write_to_out_protocol(o_prot)?; o_prot.write_field_end()?; }, + LogicalType::FLOAT16(ref f) => { + o_prot.write_field_begin(&TFieldIdentifier::new("FLOAT16", TType::Struct, 15))?; + f.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + }, } o_prot.write_field_stop()?; o_prot.write_struct_end() diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs index fe4757d41aed..e15ba311be21 100644 --- a/parquet/src/schema/printer.rs +++ b/parquet/src/schema/printer.rs @@ -270,6 +270,7 @@ fn print_logical_and_converted( LogicalType::Enum => "ENUM".to_string(), LogicalType::List => "LIST".to_string(), LogicalType::Map => "MAP".to_string(), + LogicalType::Float16 => "FLOAT16".to_string(), LogicalType::Unknown => "UNKNOWN".to_string(), }, None => { diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 11c735420957..597ed971d476 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -356,6 +356,7 @@ impl<'a> PrimitiveTypeBuilder<'a> { (LogicalType::Json, PhysicalType::BYTE_ARRAY) => {} (LogicalType::Bson, PhysicalType::BYTE_ARRAY) => {} (LogicalType::Uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {} + (LogicalType::Float16, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {} (a, b) => { return Err(general_err!( "Cannot annotate {:?} from {} for field '{}'",