Skip to content

Commit

Permalink
Support for read/write f16 Parquet to Arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Oct 29, 2023
1 parent e3cce56 commit a38ca5a
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 172 deletions.
1 change: 1 addition & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokio = { version = "1.0", optional = true, default-features = false, features =
hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false }

[dev-dependencies]
base64 = { version = "0.21", default-features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion parquet/regen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

REVISION=aeae80660c1d0c97314e9da837de1abdebd49c37
REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2

SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"

Expand Down
54 changes: 28 additions & 26 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow_array::{
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray,
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, IntervalUnit};
use half::f16;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -88,6 +89,14 @@ pub fn make_fixed_len_byte_array_reader(
));
}
}
ArrowType::Float16 => {
if byte_length != 2 {
return Err(general_err!(
"float 16 type must be 2 bytes, got {}",
byte_length
));
}
}
_ => {
return Err(general_err!(
"invalid data type for fixed length byte array reader - {}",
Expand Down Expand Up @@ -153,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader {
fn consume_batch(&mut self) -> Result<ArrayRef> {
let record_data = self.record_reader.consume_record_data();

let array_data =
ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());

let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });

Expand Down Expand Up @@ -188,26 +196,26 @@ impl ArrayReader for FixedLenByteArrayReader {
IntervalUnit::YearMonth => Arc::new(
binary
.iter()
.map(|o| {
o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))
})
.map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())))
.collect::<IntervalYearMonthArray>(),
) as ArrayRef,
IntervalUnit::DayTime => Arc::new(
binary
.iter()
.map(|o| {
o.map(|b| {
i64::from_le_bytes(b[4..12].try_into().unwrap())
})
})
.map(|o| o.map(|b| i64::from_le_bytes(b[4..12].try_into().unwrap())))
.collect::<IntervalDayTimeArray>(),
) as ArrayRef,
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => Arc::new(
binary
.iter()
.map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap())))
.collect::<Float16Array>(),
) as ArrayRef,
_ => Arc::new(binary) as ArrayRef,
};

Expand Down Expand Up @@ -278,9 +286,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
let slice = self.buffer.as_slice_mut();

let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in
values_range.rev().zip(iter_set_bits_rev(valid_mask))
{
for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
Expand Down Expand Up @@ -376,8 +382,7 @@ impl ColumnValueDecoder for ValueDecoder {
let len = range.end - range.start;
match self.decoder.as_mut().unwrap() {
Decoder::Plain { offset, buf } => {
let to_read =
(len * self.byte_length).min(buf.len() - *offset) / self.byte_length;
let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length;
let end_offset = *offset + to_read * self.byte_length;
out.buffer
.extend_from_slice(&buf.as_ref()[*offset..end_offset]);
Expand Down Expand Up @@ -470,15 +475,12 @@ mod tests {
.build()
.unwrap();

let written = RecordBatch::try_from_iter([(
"list",
Arc::new(ListArray::from(data)) as ArrayRef,
)])
.unwrap();
let written =
RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
.unwrap();

let mut buffer = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();

Expand Down
48 changes: 48 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ mod tests {
use std::sync::Arc;

use bytes::Bytes;
use half::f16;
use num::PrimInt;
use rand::{thread_rng, Rng, RngCore};
use tempfile::tempfile;
Expand Down Expand Up @@ -924,6 +925,53 @@ mod tests {
.unwrap();
}

#[test]
fn test_float16_roundtrip() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"float16",
ArrowDataType::Float16,
true,
)]));

let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(Float16Array::from_iter_values([
f16::EPSILON,
f16::INFINITY,
f16::MIN,
f16::MAX,
f16::NAN,
f16::INFINITY,
f16::NEG_INFINITY,
f16::ONE,
f16::NEG_ONE,
f16::ZERO,
f16::NEG_ZERO,
f16::E,
f16::PI,
f16::FRAC_1_PI,
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0)
.as_any()
.downcast_ref::<Float16Array>()
.unwrap();

Ok(())
}

struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
19 changes: 19 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,13 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
.unwrap();
get_decimal_256_array_slice(array, indices)
}
ArrowDataType::Float16 => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Float16Array>()
.unwrap();
get_float_16_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented".to_string(),
Expand Down Expand Up @@ -867,6 +874,18 @@ fn get_decimal_256_array_slice(
values
}

fn get_float_16_array_slice(
array: &arrow_array::Float16Array,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
let mut values = Vec::with_capacity(indices.len());
for i in indices {
let value = array.value(*i).to_le_bytes().to_vec();
values.push(FixedLenByteArray::from(ByteArray::from(value)));
}
values
}

fn get_fsb_array_slice(
array: &arrow_array::FixedSizeBinaryArray,
indices: &[usize],
Expand Down
Loading

0 comments on commit a38ca5a

Please sign in to comment.