From f49472eb93c4bf27aa20d8040c57539fc73d1059 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Mon, 4 Sep 2023 15:26:45 +0100 Subject: [PATCH 01/14] add and implement RecordReader trait for rust structs --- parquet/src/record/mod.rs | 2 + parquet/src/record/record_reader.rs | 27 ++ parquet_derive/README.md | 51 +++- parquet_derive/src/lib.rs | 87 ++++++- parquet_derive/src/parquet_field.rs | 365 +++++++++++++++++++++++++++- parquet_derive_test/src/lib.rs | 81 +++++- 6 files changed, 600 insertions(+), 13 deletions(-) create mode 100644 parquet/src/record/record_reader.rs diff --git a/parquet/src/record/mod.rs b/parquet/src/record/mod.rs index ce83cfa2b14a..27f84760fe95 100644 --- a/parquet/src/record/mod.rs +++ b/parquet/src/record/mod.rs @@ -20,6 +20,7 @@ mod api; pub mod reader; mod record_writer; +mod record_reader; mod triplet; pub use self::{ @@ -28,4 +29,5 @@ pub use self::{ RowFormatter, }, record_writer::RecordWriter, + record_reader::RecordReader, }; diff --git a/parquet/src/record/record_reader.rs b/parquet/src/record/record_reader.rs new file mode 100644 index 000000000000..496be2b05f3d --- /dev/null +++ b/parquet/src/record/record_reader.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::super::errors::ParquetError; +use super::super::file::reader::RowGroupReader; + +pub trait RecordReader { + fn read_from_row_group( + &mut self, + row_group_reader: &mut dyn RowGroupReader, + max_records: usize, + ) -> Result<(), ParquetError>; +} \ No newline at end of file diff --git a/parquet_derive/README.md b/parquet_derive/README.md index b20721079c2d..b323d999fb14 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -19,9 +19,9 @@ # Parquet Derive -A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested -structures. It only works for primitives and a few generic structures and -various levels of reference. Please see features checklist for what is currently +A crate for deriving `RecordWriter` and `RecordReader` for arbitrary, _simple_ structs. This does not +generate readers or writers for arbitrarily nested structures. It only works for primitives and a few +generic structures and various levels of reference. Please see features checklist for what is currently supported. Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support. @@ -77,16 +77,55 @@ writer.close_row_group(row_group).unwrap(); writer.close().unwrap(); ``` +Example usage of deriving a `RecordReader` for your struct: + +```rust +use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader}; +use parquet_derive::ParquetRecordReader; + +#[derive(ParquetRecordReader)] +struct ACompleteRecord { + pub a_bool: bool, + pub a_string: String, + pub i16: i16, + pub i32: i32, + pub u64: u64, + pub isize: isize, + pub float: f32, + pub double: f64, + pub now: chrono::NaiveDateTime, + pub byte_vec: Vec, +} + +// Initialize your parquet file +let reader = SerializedFileReader::new(file).unwrap(); +let mut row_group = reader.get_row_group(0).unwrap(); + +// create your records to read into +let mut chunks = vec![ACompleteRecord{ ... }]; + +// The derived `RecordReader` takes over here +chunks.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); +``` + ## Features - [x] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` - [ ] Support writing dictionaries - [x] Support writing logical types like timestamp -- [x] Derive definition_levels for `Option` -- [ ] Derive definition levels for nested structures +- [x] Derive definition_levels for `Option` for writing +- [ ] Derive definition levels for nested structures for writing - [ ] Derive writing tuple struct - [ ] Derive writing `tuple` container types +- [x] Support reading `String`, `bool`, `i32`, `f32`, `f64`, `Vec` +- [ ] Support reading/writing dictionaries +- [x] Support reading/writing logical types like timestamp +- [ ] Derive definition_levels for `Option` for reading +- [ ] Derive definition levels for nested structures for reading +- [ ] Derive reading/writing tuple struct +- [ ] Derive reading/writing `tuple` container types + ## Requirements - Same as `parquet-rs` @@ -103,4 +142,4 @@ To compile and view in the browser, run `cargo doc --no-deps --open`. ## License -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. +Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. \ No newline at end of file diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index c6641cd8091d..fd1659c5d4f4 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -44,7 +44,7 @@ mod parquet_field; /// use parquet::file::writer::SerializedFileWriter; /// /// use std::sync::Arc; -// +/// /// #[derive(ParquetRecordWriter)] /// struct ACompleteRecord<'a> { /// pub a_bool: bool, @@ -137,3 +137,88 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke } }).into() } + +/// Derive flat, simple RecordReader implementations. Works by parsing +/// a struct tagged with `#[derive(ParquetRecordReader)]` and emitting +/// the correct writing code for each field of the struct. Column readers +/// are generated in the order they are defined. +/// +/// It is up to the programmer to keep the order of the struct +/// fields lined up with the schema. +/// +/// Example: +/// +/// ```ignore +/// use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader}; +/// use parquet_derive::{ParquetRecordReader}; +/// +/// #[derive(ParquetRecordReader)] +/// struct ACompleteRecord { +/// pub a_bool: bool, +/// pub a_string: String, +/// } +/// +/// pub fn read_some_records() -> ACompleteRecord { +/// let mut samples = vec![ +/// ACompleteRecord { +/// a_bool: true, +/// a_string: String::from("I'm true"); +/// } +/// ]; +/// +/// let reader = SerializedFileReader::new(file).unwrap(); +/// let mut row_group = reader.get_row_group(0).unwrap(); +/// samples.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); +/// samples +/// } +/// ``` +/// +#[proc_macro_derive(ParquetRecordReader)] +pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + + let input: DeriveInput = parse_macro_input!(input as DeriveInput); + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + Data::Enum(_) => unimplemented!("Enum currently is not supported"), + Data::Union(_) => unimplemented!("Union currently is not supported"), + }; + + let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect(); + let reader_snippets: Vec = + field_infos.iter().map(|x| x.reader_snippet()).collect(); + let i: Vec<_> = (0..reader_snippets.len()).collect(); + + let derived_for = input.ident; + let generics = input.generics; + + let convertable = parquet_field::get_convertable_quote(); + + (quote! { + impl #generics ::parquet::record::RecordReader<#derived_for #generics> for &mut [#derived_for #generics] { + fn read_from_row_group( + &mut self, + row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader, + max_records: usize, + ) -> Result<(), ::parquet::errors::ParquetError> { + use ::parquet::column::reader::ColumnReader; + + #convertable + + let mut row_group_reader = row_group_reader; + let records = self; // Used by all the reader snippets to be more clear + + #( + { + if let Ok(mut column_reader) = row_group_reader.get_column_reader(#i) { + #reader_snippets + } else { + return Err(::parquet::errors::ParquetError::General("Failed to get next column".into())) + } + } + );* + + Ok(()) + } + } + }).into() +} \ No newline at end of file diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index ea6878283a33..570abce75a35 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -223,6 +223,78 @@ impl Field { } } + /// Takes the parsed field of the struct and emits a valid + /// column reader snippet. Should match exactly what you + /// would write by hand. + /// + /// Can only generate writers for basic structs, for example: + /// + /// struct Record { + /// a_bool: bool + /// } + /// + /// but not + /// + /// struct UnsupportedNestedRecord { + /// a_property: bool, + /// nested_record: Record + /// } + /// + /// because this parsing logic is not sophisticated enough for definition + /// levels beyond 2. + /// + /// `Option` types and references not supported + pub fn reader_snippet(&self) -> proc_macro2::TokenStream { + use parquet::basic::Type as BasicType; + + let ident = &self.ident; + let column_reader = self.ty.column_reader(); + let parquet_type = self.ty.physical_type_as_rust(); + + let default_type = match self.ty.physical_type() { + BasicType::BYTE_ARRAY | BasicType::FIXED_LEN_BYTE_ARRAY => quote!{ parquet::data_type::ByteArray::new() }, + BasicType::BOOLEAN => quote!{ false }, + BasicType::FLOAT | BasicType::DOUBLE => quote!{ 0. }, + BasicType::INT32 | BasicType::INT64 | BasicType::INT96 => quote!{ 0 }, + }; + + let write_batch_expr = quote! { + let mut vals_vec = Vec::new(); + vals_vec.resize(max_records, #default_type); + let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice(); + if let #column_reader(mut typed) = column_reader { + typed.read_records(max_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{#ident}); + } + }; + + let vals_writer = match &self.ty { + Type::TypePath(_) => self.copied_direct_fields(), + Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(_) => self.copied_direct_fields(), + Type::Slice(ref second_type) => match **second_type { + Type::TypePath(_) => self.copied_direct_fields(), + ref f => unimplemented!("Unsupported: {:#?}", f), + }, + ref f => unimplemented!("Unsupported: {:#?}", f), + }, + Type::Vec(ref first_type) => match **first_type { + Type::TypePath(_) => self.copied_direct_fields(), + ref f => unimplemented!("Unsupported: {:#?}", f), + }, + f => unimplemented!("Unsupported: {:#?}", f), + }; + + quote! { + { + #write_batch_expr + + #vals_writer + } + } + } + pub fn parquet_type(&self) -> proc_macro2::TokenStream { // TODO: Support group types // TODO: Add length if dealing with fixedlenbinary @@ -354,6 +426,51 @@ impl Field { } } + fn copied_direct_fields(&self) -> proc_macro2::TokenStream { + + let field_name = &self.ident; + let is_a_byte_buf = self.is_a_byte_buf; + let is_a_timestamp = + self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); + let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); + let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); + + let value = if is_a_timestamp { + quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() } + } else if is_a_date { + quote! { ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] + + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() + - ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap()).num_days()) as i32).unwrap() } + } else if is_a_uuid { + quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() } + } else if is_a_byte_buf { + quote! { vals[i].data().convert() } + } else { + quote! { vals[i].convert() } + }; + + // The below code would support references in field types, but it prevents the compiler + // from doing type inference from the compiler, so it is necessary to work out the target + // type and annotate it for this to be possible. + // + // if let Type::Reference(_, x) = &self.ty { + // if **x == Type::TypePath(syn::parse_str("str").unwrap()) { + // value = quote!{ *(#value) }; + // } + // if let Type::Slice(..) = **x { + // value = quote!{ *(#value) }; + // } + // value = quote!{ &*std::rc::Rc::new(#value) }; + // } + + quote! { + let length = std::cmp::min(vals.len(), records.len()); + for (i, r) in &mut records[..length].iter_mut().enumerate() { + r.#field_name = #value; + } + } + } + fn optional_definition_levels(&self) -> proc_macro2::TokenStream { let field_name = &self.ident; @@ -402,6 +519,29 @@ impl Type { } } + /// Takes a rust type and returns the appropriate + /// parquet-rs column reader + fn column_reader(&self) -> syn::TypePath { + use parquet::basic::Type as BasicType; + + match self.physical_type() { + BasicType::BOOLEAN => { + syn::parse_quote!(ColumnReader::BoolColumnReader) + } + BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader), + BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader), + BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader), + BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader), + BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader), + BasicType::BYTE_ARRAY => { + syn::parse_quote!(ColumnReader::ByteArrayColumnReader) + } + BasicType::FIXED_LEN_BYTE_ARRAY => { + syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader) + } + } + } + /// Helper to simplify a nested field definition to its leaf type /// /// Ex: @@ -524,6 +664,21 @@ impl Type { } } + fn physical_type_as_rust(&self) -> proc_macro2::TokenStream { + use parquet::basic::Type as BasicType; + + match self.physical_type() { + BasicType::BOOLEAN => quote!{ bool }, + BasicType::INT32 => quote!{ i32 }, + BasicType::INT64 => quote!{ i64 }, + BasicType::INT96 => unimplemented!("96-bit int currently is not supported"), + BasicType::FLOAT => quote!{ f32 }, + BasicType::DOUBLE => quote!{ f64 }, + BasicType::BYTE_ARRAY => quote!{ ::parquet::data_type::ByteArray }, + BasicType::FIXED_LEN_BYTE_ARRAY => quote!{ ::parquet::data_type::ByteArray }, + } + } + fn logical_type(&self) -> proc_macro2::TokenStream { let last_part = self.last_part(); let leaf_type = self.leaf_type_recursive(); @@ -682,6 +837,66 @@ impl Type { } } +// returns quote of function needed to convert from parquet +// types back into the types used in the struct fields. +pub fn get_convertable_quote() -> proc_macro2::TokenStream { + quote! { + trait Convertable { + fn convert(self) -> T; + } + + macro_rules! convert_as { + ($from:ty, $to:ty) => { + impl Convertable<$to> for $from { + fn convert(self) -> $to { + self as $to + } + } + }; + } + + convert_as!(i32, u8); + convert_as!(i32, u16); + convert_as!(i32, u32); + convert_as!(i32, i8); + convert_as!(i32, i16); + convert_as!(i32, i32); + convert_as!(i32, usize); + convert_as!(i32, isize); + convert_as!(i64, u64); + convert_as!(i64, i64); + convert_as!(i64, usize); + convert_as!(i64, isize); + convert_as!(bool, bool); + convert_as!(f32, f32); + convert_as!(f64, f64); + + impl Convertable for &[u8] { + fn convert(self) -> String { + String::from(std::str::from_utf8(self).expect("invalid UTF-8 sequence")) + } + } + + impl Convertable> for &[u8] { + fn convert(self) -> Vec { + self.to_vec() + } + } + + impl<'a> Convertable<&'a str> for &'a [u8] { + fn convert(self) -> &'a str { + std::str::from_utf8(self).expect("invalid UTF-8 sequence") + } + } + + impl<'a> Convertable<&'a [u8]> for &'a [u8] { + fn convert(self) -> &'a [u8] { + self + } + } + } +} + #[cfg(test)] mod test { use super::*; @@ -725,6 +940,38 @@ mod test { ) } + #[test] + fn test_generating_a_simple_reader_snippet() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABoringStruct { + counter: usize, + } + }; + + let fields = extract_fields(snippet); + let counter = Field::from(&fields[0]); + + let snippet = counter.reader_snippet().to_string(); + assert_eq!(snippet, + (quote!{ + { + let mut vals_vec = Vec::new(); + vals_vec.resize(max_records, 0); + let mut vals: &mut[i64] = vals_vec.as_mut_slice(); + if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { + typed.read_records(max_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{ counter }); + } + let length = std::cmp::min(vals.len(), records.len()); + for (i, r) in &mut records[..length].iter_mut().enumerate() { + r.counter = vals[i].convert(); + } + } + }).to_string() + ) + } + #[test] fn test_optional_to_writer_snippet() { let struct_def: proc_macro2::TokenStream = quote! { @@ -834,6 +1081,32 @@ mod test { ); } + #[test] + fn test_converting_to_column_reader_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABasicStruct { + yes_no: bool, + name: String, + } + }; + + let fields = extract_fields(snippet); + let processed: Vec<_> = fields.iter().map(Field::from).collect(); + + let column_readers: Vec<_> = processed + .iter() + .map(|field| field.ty.column_reader()) + .collect(); + + assert_eq!( + column_readers, + vec![ + syn::parse_quote!(ColumnReader::BoolColumnReader), + syn::parse_quote!(ColumnReader::ByteArrayColumnReader) + ] + ); + } + #[test] fn convert_basic_struct() { let snippet: proc_macro2::TokenStream = quote! { @@ -1007,7 +1280,7 @@ mod test { } #[test] - fn test_chrono_timestamp_millis() { + fn test_chrono_timestamp_millis_write() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { henceforth: chrono::NaiveDateTime, @@ -1050,7 +1323,35 @@ mod test { } #[test] - fn test_chrono_date() { + fn test_chrono_timestamp_millis_read() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDateTime, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.reader_snippet().to_string(),(quote!{ + { + let mut vals_vec = Vec::new(); + vals_vec.resize(max_records, 0); + let mut vals: &mut[i64] = vals_vec.as_mut_slice(); + if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { + typed.read_records(max_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); + } + let length = std::cmp::min(vals.len(), records.len()); + for (i, r) in &mut records[..length].iter_mut().enumerate() { + r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap(); + } + } + }).to_string()); + } + + #[test] + fn test_chrono_date_write() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { henceforth: chrono::NaiveDate, @@ -1093,7 +1394,37 @@ mod test { } #[test] - fn test_uuid() { + fn test_chrono_date_read() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDate, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.reader_snippet().to_string(),(quote!{ + { + let mut vals_vec = Vec::new(); + vals_vec.resize(max_records, 0); + let mut vals: &mut [i32] = vals_vec.as_mut_slice(); + if let ColumnReader::Int32ColumnReader(mut typed) = column_reader { + typed.read_records(max_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); + } + let length = std::cmp::min(vals.len(), records.len()); + for (i, r) in &mut records[..length].iter_mut().enumerate() { + r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] + + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() + - ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap()).num_days()) as i32).unwrap(); + } + } + }).to_string()); + } + + #[test] + fn test_uuid_write() { let snippet: proc_macro2::TokenStream = quote! { struct AUuidStruct { unique_id: uuid::Uuid, @@ -1135,6 +1466,34 @@ mod test { }).to_string()); } + #[test] + fn test_uuid_read() { + let snippet: proc_macro2::TokenStream = quote! { + struct AUuidStruct { + unique_id: uuid::Uuid, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.reader_snippet().to_string(),(quote!{ + { + let mut vals_vec = Vec::new(); + vals_vec.resize(max_records, parquet::data_type::ByteArray::new()); + let mut vals: &mut [::parquet::data_type::ByteArray] = vals_vec.as_mut_slice(); + if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader { + typed.read_records(max_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{ unique_id }); + } + let length = std::cmp::min(vals.len(), records.len()); + for (i, r) in &mut records[..length].iter_mut().enumerate() { + r.unique_id = ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap(); + } + } + }).to_string()); + } + #[test] fn test_converted_type() { let snippet: proc_macro2::TokenStream = quote! { diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index f4f8be1e0d8c..2fef4c507cf6 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -17,7 +17,7 @@ #![allow(clippy::approx_constant)] -use parquet_derive::ParquetRecordWriter; +use parquet_derive::{ParquetRecordWriter, ParquetRecordReader}; #[derive(ParquetRecordWriter)] struct ACompleteRecord<'a> { @@ -49,14 +49,29 @@ struct ACompleteRecord<'a> { pub borrowed_maybe_borrowed_byte_vec: &'a Option<&'a [u8]>, } -#[cfg(test)] +#[derive(PartialEq, ParquetRecordWriter, ParquetRecordReader, Debug)] +struct APartiallyCompleteRecord { + pub a_bool: bool, + pub a_string: String, + pub i16: i16, + pub i32: i32, + pub u64: u64, + pub isize: isize, + pub float: f32, + pub double: f64, + pub now: chrono::NaiveDateTime, + pub date: chrono::NaiveDate, + pub byte_vec: Vec, +} + +#[cfg(test)] mod tests { use super::*; use std::{env, fs, io::Write, sync::Arc}; use parquet::{ - file::writer::SerializedFileWriter, record::RecordWriter, + file::writer::SerializedFileWriter, record::{RecordWriter, RecordReader}, schema::parser::parse_message_type, }; @@ -148,6 +163,66 @@ mod tests { writer.close().unwrap(); } + #[test] + fn test_parquet_derive_read_write_combined() { + let file = get_temp_file("test_parquet_derive_combined", &[]); + + let mut drs: Vec = vec![APartiallyCompleteRecord { + a_bool: true, + a_string: "a string".into(), + i16: -45, + i32: 456, + u64: 4563424, + isize: -365, + float: 3.5, + double: std::f64::NAN, + now: chrono::Utc::now().naive_local(), + date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), + byte_vec: vec![0x65, 0x66, 0x67], + }]; + + let mut out: Vec = vec![APartiallyCompleteRecord { + a_bool: false, + a_string: "a different string".into(), + i16: -450, + i32: 4560, + u64: 45634240, + isize: -3650, + float: 30.5, + double: 10., + now: chrono::Utc::now().naive_local(), + date: chrono::naive::NaiveDate::from_ymd_opt(1982, 1, 27).unwrap(), + byte_vec: vec![0x17, 0x18, 0x19], + }]; + + use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader}; + + let generated_schema = drs.as_slice().schema().unwrap(); + + let props = Default::default(); + let mut writer = + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + row_group.close().unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file).unwrap(); + + let mut row_group = reader.get_row_group(0).unwrap(); + out.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); + + // correct for rounding error when writing milliseconds + drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis(drs[0].now.timestamp_millis()).unwrap(); + + assert!(out[0].double.is_nan()); // these three lines are necessary because NAN != NAN + out[0].double = 0.; + drs[0].double = 0.; + + assert_eq!(drs[0], out[0]); + } + /// Returns file handle for a temp file in 'target' directory with a provided content pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File { // build tmp path to a file in "target/debug/testdata" From dd8a3b0f3a46d48cade54e558b06654f11f87d58 Mon Sep 17 00:00:00 2001 From: Joseph Rance <56409230+Joseph-Rance@users.noreply.github.com> Date: Mon, 4 Sep 2023 17:13:53 +0100 Subject: [PATCH 02/14] Fix typo in comment --- parquet_derive/src/parquet_field.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 570abce75a35..d4cdc8a8ae67 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -450,8 +450,8 @@ impl Field { }; // The below code would support references in field types, but it prevents the compiler - // from doing type inference from the compiler, so it is necessary to work out the target - // type and annotate it for this to be possible. + // from doing type inference, so it is necessary to work out the target type and annotate + // it for this to be possible. // // if let Type::Reference(_, x) = &self.ty { // if **x == Type::TypePath(syn::parse_str("str").unwrap()) { From eb1402dd4bb47c6f133deb15398fc5aefde7a1e2 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Tue, 5 Sep 2023 11:24:00 +0100 Subject: [PATCH 03/14] run cargo fmt --- parquet/src/record/mod.rs | 4 ++-- parquet/src/record/record_reader.rs | 4 ++-- parquet_derive/src/lib.rs | 31 ++++++++++++++--------------- parquet_derive/src/parquet_field.rs | 29 ++++++++++++++------------- parquet_derive_test/src/lib.rs | 23 ++++++++++++++------- 5 files changed, 50 insertions(+), 41 deletions(-) diff --git a/parquet/src/record/mod.rs b/parquet/src/record/mod.rs index 27f84760fe95..6301c4fc4147 100644 --- a/parquet/src/record/mod.rs +++ b/parquet/src/record/mod.rs @@ -19,8 +19,8 @@ mod api; pub mod reader; -mod record_writer; mod record_reader; +mod record_writer; mod triplet; pub use self::{ @@ -28,6 +28,6 @@ pub use self::{ Field, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, RowFormatter, }, - record_writer::RecordWriter, record_reader::RecordReader, + record_writer::RecordWriter, }; diff --git a/parquet/src/record/record_reader.rs b/parquet/src/record/record_reader.rs index 496be2b05f3d..dacd37b3e585 100644 --- a/parquet/src/record/record_reader.rs +++ b/parquet/src/record/record_reader.rs @@ -23,5 +23,5 @@ pub trait RecordReader { &mut self, row_group_reader: &mut dyn RowGroupReader, max_records: usize, - ) -> Result<(), ParquetError>; -} \ No newline at end of file + ) -> Result<(), ParquetError>; +} diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index fd1659c5d4f4..dfac17c51c1c 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -175,25 +175,24 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke /// #[proc_macro_derive(ParquetRecordReader)] pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = parse_macro_input!(input as DeriveInput); + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + Data::Enum(_) => unimplemented!("Enum currently is not supported"), + Data::Union(_) => unimplemented!("Union currently is not supported"), + }; - let input: DeriveInput = parse_macro_input!(input as DeriveInput); - let fields = match input.data { - Data::Struct(DataStruct { fields, .. }) => fields, - Data::Enum(_) => unimplemented!("Enum currently is not supported"), - Data::Union(_) => unimplemented!("Union currently is not supported"), - }; - - let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect(); - let reader_snippets: Vec = - field_infos.iter().map(|x| x.reader_snippet()).collect(); - let i: Vec<_> = (0..reader_snippets.len()).collect(); + let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect(); + let reader_snippets: Vec = + field_infos.iter().map(|x| x.reader_snippet()).collect(); + let i: Vec<_> = (0..reader_snippets.len()).collect(); - let derived_for = input.ident; - let generics = input.generics; + let derived_for = input.ident; + let generics = input.generics; - let convertable = parquet_field::get_convertable_quote(); + let convertable = parquet_field::get_convertable_quote(); - (quote! { + (quote! { impl #generics ::parquet::record::RecordReader<#derived_for #generics> for &mut [#derived_for #generics] { fn read_from_row_group( &mut self, @@ -221,4 +220,4 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke } } }).into() -} \ No newline at end of file +} diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index d4cdc8a8ae67..5a48d8d6921d 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -252,10 +252,12 @@ impl Field { let parquet_type = self.ty.physical_type_as_rust(); let default_type = match self.ty.physical_type() { - BasicType::BYTE_ARRAY | BasicType::FIXED_LEN_BYTE_ARRAY => quote!{ parquet::data_type::ByteArray::new() }, - BasicType::BOOLEAN => quote!{ false }, - BasicType::FLOAT | BasicType::DOUBLE => quote!{ 0. }, - BasicType::INT32 | BasicType::INT64 | BasicType::INT96 => quote!{ 0 }, + BasicType::BYTE_ARRAY | BasicType::FIXED_LEN_BYTE_ARRAY => { + quote! { parquet::data_type::ByteArray::new() } + } + BasicType::BOOLEAN => quote! { false }, + BasicType::FLOAT | BasicType::DOUBLE => quote! { 0. }, + BasicType::INT32 | BasicType::INT64 | BasicType::INT96 => quote! { 0 }, }; let write_batch_expr = quote! { @@ -427,7 +429,6 @@ impl Field { } fn copied_direct_fields(&self) -> proc_macro2::TokenStream { - let field_name = &self.ident; let is_a_byte_buf = self.is_a_byte_buf; let is_a_timestamp = @@ -668,14 +669,14 @@ impl Type { use parquet::basic::Type as BasicType; match self.physical_type() { - BasicType::BOOLEAN => quote!{ bool }, - BasicType::INT32 => quote!{ i32 }, - BasicType::INT64 => quote!{ i64 }, + BasicType::BOOLEAN => quote! { bool }, + BasicType::INT32 => quote! { i32 }, + BasicType::INT64 => quote! { i64 }, BasicType::INT96 => unimplemented!("96-bit int currently is not supported"), - BasicType::FLOAT => quote!{ f32 }, - BasicType::DOUBLE => quote!{ f64 }, - BasicType::BYTE_ARRAY => quote!{ ::parquet::data_type::ByteArray }, - BasicType::FIXED_LEN_BYTE_ARRAY => quote!{ ::parquet::data_type::ByteArray }, + BasicType::FLOAT => quote! { f32 }, + BasicType::DOUBLE => quote! { f64 }, + BasicType::BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray }, + BasicType::FIXED_LEN_BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray }, } } @@ -844,7 +845,7 @@ pub fn get_convertable_quote() -> proc_macro2::TokenStream { trait Convertable { fn convert(self) -> T; } - + macro_rules! convert_as { ($from:ty, $to:ty) => { impl Convertable<$to> for $from { @@ -854,7 +855,7 @@ pub fn get_convertable_quote() -> proc_macro2::TokenStream { } }; } - + convert_as!(i32, u8); convert_as!(i32, u16); convert_as!(i32, u32); diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 2fef4c507cf6..58bdaf23bd1a 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -17,7 +17,7 @@ #![allow(clippy::approx_constant)] -use parquet_derive::{ParquetRecordWriter, ParquetRecordReader}; +use parquet_derive::{ParquetRecordReader, ParquetRecordWriter}; #[derive(ParquetRecordWriter)] struct ACompleteRecord<'a> { @@ -64,14 +64,15 @@ struct APartiallyCompleteRecord { pub byte_vec: Vec, } -#[cfg(test)] +#[cfg(test)] mod tests { use super::*; use std::{env, fs, io::Write, sync::Arc}; use parquet::{ - file::writer::SerializedFileWriter, record::{RecordWriter, RecordReader}, + file::writer::SerializedFileWriter, + record::{RecordReader, RecordWriter}, schema::parser::parse_message_type, }; @@ -195,13 +196,16 @@ mod tests { byte_vec: vec![0x17, 0x18, 0x19], }]; - use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader}; + use parquet::file::{ + reader::FileReader, serialized_reader::SerializedFileReader, + }; let generated_schema = drs.as_slice().schema().unwrap(); let props = Default::default(); let mut writer = - SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props) + .unwrap(); let mut row_group = writer.next_row_group().unwrap(); drs.as_slice().write_to_row_group(&mut row_group).unwrap(); @@ -211,10 +215,15 @@ mod tests { let reader = SerializedFileReader::new(file).unwrap(); let mut row_group = reader.get_row_group(0).unwrap(); - out.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); + out.as_mut_slice() + .read_from_row_group(&mut *row_group, 2) + .unwrap(); // correct for rounding error when writing milliseconds - drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis(drs[0].now.timestamp_millis()).unwrap(); + drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis( + drs[0].now.timestamp_millis(), + ) + .unwrap(); assert!(out[0].double.is_nan()); // these three lines are necessary because NAN != NAN out[0].double = 0.; From 9f2dd9f901462b6221bc7ba9fab04343c85469b6 Mon Sep 17 00:00:00 2001 From: joseph rance Date: Tue, 19 Sep 2023 10:39:32 +0100 Subject: [PATCH 04/14] partially solve issues raised in review --- parquet/src/record/record_reader.rs | 1 + parquet/src/record/record_writer.rs | 2 + parquet_derive/README.md | 2 +- parquet_derive/src/lib.rs | 4 - parquet_derive/src/parquet_field.rs | 207 +++++++++++----------------- parquet_derive_test/src/lib.rs | 21 ++- 6 files changed, 97 insertions(+), 140 deletions(-) diff --git a/parquet/src/record/record_reader.rs b/parquet/src/record/record_reader.rs index dacd37b3e585..938c14b29406 100644 --- a/parquet/src/record/record_reader.rs +++ b/parquet/src/record/record_reader.rs @@ -18,6 +18,7 @@ use super::super::errors::ParquetError; use super::super::file::reader::RowGroupReader; +/// read up to `max_records` records from `row_group_reader` into `self` pub trait RecordReader { fn read_from_row_group( &mut self, diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index 62099051f513..5a749dec407a 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -20,6 +20,8 @@ use crate::schema::types::TypePtr; use super::super::errors::ParquetError; use super::super::file::writer::SerializedRowGroupWriter; +/// `write_to_row_group` writes from `self` into `row_group_writer` +/// `schema` builds the schema used by `row_group_writer` pub trait RecordWriter { fn write_to_row_group( &self, diff --git a/parquet_derive/README.md b/parquet_derive/README.md index b323d999fb14..fd121f28d0ed 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -118,7 +118,7 @@ chunks.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); - [ ] Derive writing tuple struct - [ ] Derive writing `tuple` container types -- [x] Support reading `String`, `bool`, `i32`, `f32`, `f64`, `Vec` +- [x] Support reading `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` - [ ] Support reading/writing dictionaries - [x] Support reading/writing logical types like timestamp - [ ] Derive definition_levels for `Option` for reading diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index dfac17c51c1c..1f287a4aa02c 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -190,8 +190,6 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let derived_for = input.ident; let generics = input.generics; - let convertable = parquet_field::get_convertable_quote(); - (quote! { impl #generics ::parquet::record::RecordReader<#derived_for #generics> for &mut [#derived_for #generics] { fn read_from_row_group( @@ -201,8 +199,6 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke ) -> Result<(), ::parquet::errors::ParquetError> { use ::parquet::column::reader::ColumnReader; - #convertable - let mut row_group_reader = row_group_reader; let records = self; // Used by all the reader snippets to be more clear diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 5a48d8d6921d..0e636f6ed4b1 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -245,24 +245,14 @@ impl Field { /// /// `Option` types and references not supported pub fn reader_snippet(&self) -> proc_macro2::TokenStream { - use parquet::basic::Type as BasicType; - let ident = &self.ident; let column_reader = self.ty.column_reader(); let parquet_type = self.ty.physical_type_as_rust(); - let default_type = match self.ty.physical_type() { - BasicType::BYTE_ARRAY | BasicType::FIXED_LEN_BYTE_ARRAY => { - quote! { parquet::data_type::ByteArray::new() } - } - BasicType::BOOLEAN => quote! { false }, - BasicType::FLOAT | BasicType::DOUBLE => quote! { 0. }, - BasicType::INT32 | BasicType::INT64 | BasicType::INT96 => quote! { 0 }, - }; - + // generate the code to read the column into a vector `vals` let write_batch_expr = quote! { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, #default_type); + vals_vec.resize(max_records, Default::default()); let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice(); if let #column_reader(mut typed) = column_reader { typed.read_records(max_records, None, None, vals)?; @@ -271,6 +261,8 @@ impl Field { } }; + // generate the code to convert each element of `vals` to the correct type and then write + // it to its field in the corresponding struct let vals_writer = match &self.ty { Type::TypePath(_) => self.copied_direct_fields(), Type::Reference(_, ref first_type) => match **first_type { @@ -400,26 +392,28 @@ impl Field { fn copied_direct_vals(&self) -> proc_macro2::TokenStream { let field_name = &self.ident; - let is_a_byte_buf = self.is_a_byte_buf; - let is_a_timestamp = - self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); - let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); - let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); - let access = if is_a_timestamp { - quote! { rec.#field_name.timestamp_millis() } - } else if is_a_date { - quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } - } else if is_a_uuid { - quote! { (&rec.#field_name.to_string()[..]).into() } - } else if is_a_byte_buf { - quote! { (&rec.#field_name[..]).into() } - } else { - // Type might need converting to a physical type - match self.ty.physical_type() { - parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 }, - parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 }, - _ => quote! { rec.#field_name }, + let access = match self.third_party_type { + Some(ThirdPartyType::ChronoNaiveDateTime) => { + quote! { rec.#field_name.timestamp_millis() } + } + Some(ThirdPartyType::ChronoNaiveDate) => { + quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } + } + Some(ThirdPartyType::Uuid) => { + quote! { (&rec.#field_name.to_string()[..]).into() } + } + _ => { + if self.is_a_byte_buf { + quote! { (&rec.#field_name[..]).into() } + } else { + // Type might need converting to a physical type + match self.ty.physical_type() { + parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 }, + parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 }, + _ => quote! { rec.#field_name }, + } + } } }; @@ -430,40 +424,51 @@ impl Field { fn copied_direct_fields(&self) -> proc_macro2::TokenStream { let field_name = &self.ident; - let is_a_byte_buf = self.is_a_byte_buf; - let is_a_timestamp = - self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); - let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); - let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); - let value = if is_a_timestamp { - quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() } - } else if is_a_date { - quote! { ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] - + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() - - ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap()).num_days()) as i32).unwrap() } - } else if is_a_uuid { - quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() } - } else if is_a_byte_buf { - quote! { vals[i].data().convert() } - } else { - quote! { vals[i].convert() } + let value = match self.third_party_type { + Some(ThirdPartyType::ChronoNaiveDateTime) => { + quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() } + } + Some(ThirdPartyType::ChronoNaiveDate) => { + quote! { + ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] + + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() + .signed_duration_since( + ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap() + ) + ).num_days()) as i32).unwrap() + } + } + Some(ThirdPartyType::Uuid) => { + quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() } + } + _ => match &self.ty { + Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(_) => match self.ty.last_part().as_str() { + "String" => quote! { + std::rc::Rc::new(String::from(std::str::from_utf8(vals[i].data()).expect("invalid UTF-8 sequence"))) + }, + "str" => quote! { + std::rc::Rc::new(*std::str::from_utf8(vals[i].data()).expect("invalid UTF-8 sequence")) + }, + f => unimplemented!("Unsupported: {:#?}", f), + }, + Type::Slice(_) => quote! { std::rc::Rc::new(*vals[i].data()) }, + ref f => unimplemented!("Unsupported: {:#?}", f), + }, + Type::TypePath(_) => match self.ty.last_part().as_str() { + "String" => quote! { String::from(std::str::from_utf8(vals[i].data()) + .expect("invalid UTF-8 sequence")) }, + t => { + let s: proc_macro2::TokenStream = t.parse().unwrap(); + quote! { vals[i] as #s } + } + }, + Type::Vec(_) => quote! { vals[i].data().to_vec() }, + f => unimplemented!("Unsupported: {:#?}", f), + }, }; - // The below code would support references in field types, but it prevents the compiler - // from doing type inference, so it is necessary to work out the target type and annotate - // it for this to be possible. - // - // if let Type::Reference(_, x) = &self.ty { - // if **x == Type::TypePath(syn::parse_str("str").unwrap()) { - // value = quote!{ *(#value) }; - // } - // if let Type::Slice(..) = **x { - // value = quote!{ *(#value) }; - // } - // value = quote!{ &*std::rc::Rc::new(#value) }; - // } - quote! { let length = std::cmp::min(vals.len(), records.len()); for (i, r) in &mut records[..length].iter_mut().enumerate() { @@ -676,7 +681,9 @@ impl Type { BasicType::FLOAT => quote! { f32 }, BasicType::DOUBLE => quote! { f64 }, BasicType::BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray }, - BasicType::FIXED_LEN_BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray }, + BasicType::FIXED_LEN_BYTE_ARRAY => { + quote! { ::parquet::data_type::FixedLenByteArray } + } } } @@ -838,66 +845,6 @@ impl Type { } } -// returns quote of function needed to convert from parquet -// types back into the types used in the struct fields. -pub fn get_convertable_quote() -> proc_macro2::TokenStream { - quote! { - trait Convertable { - fn convert(self) -> T; - } - - macro_rules! convert_as { - ($from:ty, $to:ty) => { - impl Convertable<$to> for $from { - fn convert(self) -> $to { - self as $to - } - } - }; - } - - convert_as!(i32, u8); - convert_as!(i32, u16); - convert_as!(i32, u32); - convert_as!(i32, i8); - convert_as!(i32, i16); - convert_as!(i32, i32); - convert_as!(i32, usize); - convert_as!(i32, isize); - convert_as!(i64, u64); - convert_as!(i64, i64); - convert_as!(i64, usize); - convert_as!(i64, isize); - convert_as!(bool, bool); - convert_as!(f32, f32); - convert_as!(f64, f64); - - impl Convertable for &[u8] { - fn convert(self) -> String { - String::from(std::str::from_utf8(self).expect("invalid UTF-8 sequence")) - } - } - - impl Convertable> for &[u8] { - fn convert(self) -> Vec { - self.to_vec() - } - } - - impl<'a> Convertable<&'a str> for &'a [u8] { - fn convert(self) -> &'a str { - std::str::from_utf8(self).expect("invalid UTF-8 sequence") - } - } - - impl<'a> Convertable<&'a [u8]> for &'a [u8] { - fn convert(self) -> &'a [u8] { - self - } - } - } -} - #[cfg(test)] mod test { use super::*; @@ -957,7 +904,7 @@ mod test { (quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, 0); + vals_vec.resize(max_records, Default::default()); let mut vals: &mut[i64] = vals_vec.as_mut_slice(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { typed.read_records(max_records, None, None, vals)?; @@ -966,7 +913,7 @@ mod test { } let length = std::cmp::min(vals.len(), records.len()); for (i, r) in &mut records[..length].iter_mut().enumerate() { - r.counter = vals[i].convert(); + r.counter = vals[i] as usize; } } }).to_string() @@ -1336,7 +1283,7 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, 0); + vals_vec.resize(max_records, Default::default()); let mut vals: &mut[i64] = vals_vec.as_mut_slice(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { typed.read_records(max_records, None, None, vals)?; @@ -1407,7 +1354,7 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, 0); + vals_vec.resize(max_records, Default::default()); let mut vals: &mut [i32] = vals_vec.as_mut_slice(); if let ColumnReader::Int32ColumnReader(mut typed) = column_reader { typed.read_records(max_records, None, None, vals)?; @@ -1418,7 +1365,9 @@ mod test { for (i, r) in &mut records[..length].iter_mut().enumerate() { r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() - - ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap()).num_days()) as i32).unwrap(); + .signed_duration_since( + ::chrono::naive::NaiveDate::from_ymd_opt(0, 12, 31).unwrap() + )).num_days()) as i32).unwrap(); } } }).to_string()); @@ -1480,7 +1429,7 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, parquet::data_type::ByteArray::new()); + vals_vec.resize(max_records, Default::default()); let mut vals: &mut [::parquet::data_type::ByteArray] = vals_vec.as_mut_slice(); if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader { typed.read_records(max_records, None, None, vals)?; diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 58bdaf23bd1a..881b7a4892d5 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -51,8 +51,10 @@ struct ACompleteRecord<'a> { #[derive(PartialEq, ParquetRecordWriter, ParquetRecordReader, Debug)] struct APartiallyCompleteRecord { - pub a_bool: bool, - pub a_string: String, + pub bool: bool, + //pub str_reference: &'a str, + pub string: String, + //pub string_reference: &'a String, pub i16: i16, pub i32: i32, pub u64: u64, @@ -62,6 +64,7 @@ struct APartiallyCompleteRecord { pub now: chrono::NaiveDateTime, pub date: chrono::NaiveDate, pub byte_vec: Vec, + //pub byte_slice: &'a [u8], } #[cfg(test)] @@ -169,8 +172,10 @@ mod tests { let file = get_temp_file("test_parquet_derive_combined", &[]); let mut drs: Vec = vec![APartiallyCompleteRecord { - a_bool: true, - a_string: "a string".into(), + bool: true, + //str_reference: "a str", + string: "a string".into(), + //string_reference: &"a string reference".into(), i16: -45, i32: 456, u64: 4563424, @@ -180,11 +185,14 @@ mod tests { now: chrono::Utc::now().naive_local(), date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), byte_vec: vec![0x65, 0x66, 0x67], + //byte_slice: &vec![0x65, 0x66, 0x67][..], }]; let mut out: Vec = vec![APartiallyCompleteRecord { - a_bool: false, - a_string: "a different string".into(), + bool: false, + //str_reference: "a different str", + string: "a different string".into(), + //string_reference: &"a different string reference".into(), i16: -450, i32: 4560, u64: 45634240, @@ -194,6 +202,7 @@ mod tests { now: chrono::Utc::now().naive_local(), date: chrono::naive::NaiveDate::from_ymd_opt(1982, 1, 27).unwrap(), byte_vec: vec![0x17, 0x18, 0x19], + //byte_slice: &vec![0x17, 0x18, 0x19][..], }]; use parquet::file::{ From 7d4f128e0ee847841b9318a2de562e45c9ae2a49 Mon Sep 17 00:00:00 2001 From: joseph rance Date: Tue, 19 Sep 2023 17:20:24 +0100 Subject: [PATCH 05/14] remove references --- parquet_derive/src/parquet_field.rs | 13 ------------- parquet_derive_test/src/lib.rs | 9 --------- 2 files changed, 22 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 0e636f6ed4b1..80a34dcce7d5 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -443,19 +443,6 @@ impl Field { quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() } } _ => match &self.ty { - Type::Reference(_, ref first_type) => match **first_type { - Type::TypePath(_) => match self.ty.last_part().as_str() { - "String" => quote! { - std::rc::Rc::new(String::from(std::str::from_utf8(vals[i].data()).expect("invalid UTF-8 sequence"))) - }, - "str" => quote! { - std::rc::Rc::new(*std::str::from_utf8(vals[i].data()).expect("invalid UTF-8 sequence")) - }, - f => unimplemented!("Unsupported: {:#?}", f), - }, - Type::Slice(_) => quote! { std::rc::Rc::new(*vals[i].data()) }, - ref f => unimplemented!("Unsupported: {:#?}", f), - }, Type::TypePath(_) => match self.ty.last_part().as_str() { "String" => quote! { String::from(std::str::from_utf8(vals[i].data()) .expect("invalid UTF-8 sequence")) }, diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 881b7a4892d5..b02d4ecfb254 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -52,9 +52,7 @@ struct ACompleteRecord<'a> { #[derive(PartialEq, ParquetRecordWriter, ParquetRecordReader, Debug)] struct APartiallyCompleteRecord { pub bool: bool, - //pub str_reference: &'a str, pub string: String, - //pub string_reference: &'a String, pub i16: i16, pub i32: i32, pub u64: u64, @@ -64,7 +62,6 @@ struct APartiallyCompleteRecord { pub now: chrono::NaiveDateTime, pub date: chrono::NaiveDate, pub byte_vec: Vec, - //pub byte_slice: &'a [u8], } #[cfg(test)] @@ -173,9 +170,7 @@ mod tests { let mut drs: Vec = vec![APartiallyCompleteRecord { bool: true, - //str_reference: "a str", string: "a string".into(), - //string_reference: &"a string reference".into(), i16: -45, i32: 456, u64: 4563424, @@ -185,14 +180,11 @@ mod tests { now: chrono::Utc::now().naive_local(), date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), byte_vec: vec![0x65, 0x66, 0x67], - //byte_slice: &vec![0x65, 0x66, 0x67][..], }]; let mut out: Vec = vec![APartiallyCompleteRecord { bool: false, - //str_reference: "a different str", string: "a different string".into(), - //string_reference: &"a different string reference".into(), i16: -450, i32: 4560, u64: 45634240, @@ -202,7 +194,6 @@ mod tests { now: chrono::Utc::now().naive_local(), date: chrono::naive::NaiveDate::from_ymd_opt(1982, 1, 27).unwrap(), byte_vec: vec![0x17, 0x18, 0x19], - //byte_slice: &vec![0x17, 0x18, 0x19][..], }]; use parquet::file::{ From 8cd0a4e03a9f2ed0cca9c36f95b2833c9ec4e5c9 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Mon, 2 Oct 2023 12:07:25 +0100 Subject: [PATCH 06/14] change interface to use vectors --- parquet_derive_test/src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index b02d4ecfb254..c7cc1941efc8 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -150,7 +150,7 @@ mod tests { borrowed_maybe_borrowed_byte_vec: &borrowed_maybe_borrowed_byte_vec, }]; - let generated_schema = drs.as_slice().schema().unwrap(); + let generated_schema = drs.schema().unwrap(); assert_eq!(&schema, &generated_schema); @@ -159,7 +159,7 @@ mod tests { SerializedFileWriter::new(file, generated_schema, props).unwrap(); let mut row_group = writer.next_row_group().unwrap(); - drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + drs.write_to_row_group(&mut row_group).unwrap(); row_group.close().unwrap(); writer.close().unwrap(); } @@ -200,7 +200,7 @@ mod tests { reader::FileReader, serialized_reader::SerializedFileReader, }; - let generated_schema = drs.as_slice().schema().unwrap(); + let generated_schema = drs.schema().unwrap(); let props = Default::default(); let mut writer = @@ -208,16 +208,14 @@ mod tests { .unwrap(); let mut row_group = writer.next_row_group().unwrap(); - drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + drs.write_to_row_group(&mut row_group).unwrap(); row_group.close().unwrap(); writer.close().unwrap(); let reader = SerializedFileReader::new(file).unwrap(); let mut row_group = reader.get_row_group(0).unwrap(); - out.as_mut_slice() - .read_from_row_group(&mut *row_group, 2) - .unwrap(); + out.read_from_row_group(&mut *row_group, 2).unwrap(); // correct for rounding error when writing milliseconds drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis( From 3eb5239c88c2b84b97b5ef273c9c6f1260d7c8dd Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Mon, 2 Oct 2023 12:08:30 +0100 Subject: [PATCH 07/14] change interface to use vectors in as well --- parquet_derive/README.md | 4 ++-- parquet_derive/src/lib.rs | 9 ++++---- parquet_derive/src/parquet_field.rs | 35 +++++++++++++---------------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/parquet_derive/README.md b/parquet_derive/README.md index fd121f28d0ed..7ea9575bb096 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -71,7 +71,7 @@ let mut row_group = writer.next_row_group().unwrap(); let chunks = vec![ACompleteRecord{...}]; // The derived `RecordWriter` takes over here -(&chunks[..]).write_to_row_group(&mut row_group); +chunks.write_to_row_group(&mut row_group); writer.close_row_group(row_group).unwrap(); writer.close().unwrap(); @@ -105,7 +105,7 @@ let mut row_group = reader.get_row_group(0).unwrap(); let mut chunks = vec![ACompleteRecord{ ... }]; // The derived `RecordReader` takes over here -chunks.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); +chunks.read_from_row_group(&mut *row_group, 2).unwrap(); ``` ## Features diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 1f287a4aa02c..74ce9df64343 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -68,7 +68,7 @@ mod parquet_field; /// let mut writer = SerializedFileWriter::new(file, schema, Default::default()).unwrap(); /// /// let mut row_group = writer.next_row_group().unwrap(); -/// samples.as_slice().write_to_row_group(&mut row_group).unwrap(); +/// samples.write_to_row_group(&mut row_group).unwrap(); /// writer.close_row_group(row_group).unwrap(); /// writer.close().unwrap(); /// } @@ -95,7 +95,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke field_infos.iter().map(|x| x.parquet_type()).collect(); (quote! { - impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { + impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for Vec<#derived_for #generics> { fn write_to_row_group( &self, row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> @@ -168,7 +168,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke /// /// let reader = SerializedFileReader::new(file).unwrap(); /// let mut row_group = reader.get_row_group(0).unwrap(); -/// samples.as_mut_slice().read_from_row_group(&mut *row_group, 2).unwrap(); +/// samples.read_from_row_group(&mut *row_group, 2).unwrap(); /// samples /// } /// ``` @@ -191,7 +191,7 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let generics = input.generics; (quote! { - impl #generics ::parquet::record::RecordReader<#derived_for #generics> for &mut [#derived_for #generics] { + impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> { fn read_from_row_group( &mut self, row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader, @@ -201,6 +201,7 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let mut row_group_reader = row_group_reader; let records = self; // Used by all the reader snippets to be more clear + let num_records = std::cmp::min(max_records, records.len()); #( { diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 80a34dcce7d5..f79940b093b3 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -252,10 +252,10 @@ impl Field { // generate the code to read the column into a vector `vals` let write_batch_expr = quote! { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, Default::default()); + vals_vec.resize(num_records, Default::default()); let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice(); if let #column_reader(mut typed) = column_reader { - typed.read_records(max_records, None, None, vals)?; + typed.read_records(num_records, None, None, vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{#ident}); } @@ -457,8 +457,7 @@ impl Field { }; quote! { - let length = std::cmp::min(vals.len(), records.len()); - for (i, r) in &mut records[..length].iter_mut().enumerate() { + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { r.#field_name = #value; } } @@ -891,15 +890,14 @@ mod test { (quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, Default::default()); + vals_vec.resize(num_records, Default::default()); let mut vals: &mut[i64] = vals_vec.as_mut_slice(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(max_records, None, None, vals)?; + typed.read_records(num_records, None, None, vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ counter }); } - let length = std::cmp::min(vals.len(), records.len()); - for (i, r) in &mut records[..length].iter_mut().enumerate() { + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { r.counter = vals[i] as usize; } } @@ -1270,15 +1268,14 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, Default::default()); + vals_vec.resize(num_records, Default::default()); let mut vals: &mut[i64] = vals_vec.as_mut_slice(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(max_records, None, None, vals)?; + typed.read_records(num_records, None, None, vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } - let length = std::cmp::min(vals.len(), records.len()); - for (i, r) in &mut records[..length].iter_mut().enumerate() { + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap(); } } @@ -1341,15 +1338,14 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, Default::default()); + vals_vec.resize(num_records, Default::default()); let mut vals: &mut [i32] = vals_vec.as_mut_slice(); if let ColumnReader::Int32ColumnReader(mut typed) = column_reader { - typed.read_records(max_records, None, None, vals)?; + typed.read_records(num_records, None, None, vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } - let length = std::cmp::min(vals.len(), records.len()); - for (i, r) in &mut records[..length].iter_mut().enumerate() { + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i] + ((::chrono::naive::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() .signed_duration_since( @@ -1416,15 +1412,14 @@ mod test { assert_eq!(when.reader_snippet().to_string(),(quote!{ { let mut vals_vec = Vec::new(); - vals_vec.resize(max_records, Default::default()); + vals_vec.resize(num_records, Default::default()); let mut vals: &mut [::parquet::data_type::ByteArray] = vals_vec.as_mut_slice(); if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader { - typed.read_records(max_records, None, None, vals)?; + typed.read_records(num_records, None, None, vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ unique_id }); } - let length = std::cmp::min(vals.len(), records.len()); - for (i, r) in &mut records[..length].iter_mut().enumerate() { + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { r.unique_id = ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap(); } } From 555067fa00c38253e68cc9e79ac2ca2a92039254 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Mon, 2 Oct 2023 12:54:07 +0100 Subject: [PATCH 08/14] update comments --- parquet/src/record/record_reader.rs | 2 ++ parquet/src/record/record_writer.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/parquet/src/record/record_reader.rs b/parquet/src/record/record_reader.rs index 938c14b29406..34d93262b016 100644 --- a/parquet/src/record/record_reader.rs +++ b/parquet/src/record/record_reader.rs @@ -19,6 +19,8 @@ use super::super::errors::ParquetError; use super::super::file::reader::RowGroupReader; /// read up to `max_records` records from `row_group_reader` into `self` +/// The type parameter `T` is used to work around the rust orphan rule +/// when implementing on types such as `Vec`. pub trait RecordReader { fn read_from_row_group( &mut self, diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index 5a749dec407a..a9cbbbe6ccec 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -22,6 +22,8 @@ use super::super::file::writer::SerializedRowGroupWriter; /// `write_to_row_group` writes from `self` into `row_group_writer` /// `schema` builds the schema used by `row_group_writer` +/// The type parameter `T` is used to work around the rust orphan rule +/// when implementing on types such as `Vec`. pub trait RecordWriter { fn write_to_row_group( &self, From 83e4df383ecdd350ed5e3fae090e17d291422921 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Thu, 5 Oct 2023 10:41:02 +0100 Subject: [PATCH 09/14] remove intitialisation requirement --- parquet/src/record/record_reader.rs | 2 +- parquet_derive/README.md | 2 +- parquet_derive/src/lib.rs | 27 ++++++++++++++++++++++++--- parquet_derive_test/src/lib.rs | 16 ++-------------- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/parquet/src/record/record_reader.rs b/parquet/src/record/record_reader.rs index 34d93262b016..bcfeb95dcdf4 100644 --- a/parquet/src/record/record_reader.rs +++ b/parquet/src/record/record_reader.rs @@ -25,6 +25,6 @@ pub trait RecordReader { fn read_from_row_group( &mut self, row_group_reader: &mut dyn RowGroupReader, - max_records: usize, + num_records: usize, ) -> Result<(), ParquetError>; } diff --git a/parquet_derive/README.md b/parquet_derive/README.md index 7ea9575bb096..f737ec1c851e 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -105,7 +105,7 @@ let mut row_group = reader.get_row_group(0).unwrap(); let mut chunks = vec![ACompleteRecord{ ... }]; // The derived `RecordReader` takes over here -chunks.read_from_row_group(&mut *row_group, 2).unwrap(); +chunks.read_from_row_group(&mut *row_group, 1).unwrap(); ``` ## Features diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 74ce9df64343..5932373aeb16 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -168,7 +168,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke /// /// let reader = SerializedFileReader::new(file).unwrap(); /// let mut row_group = reader.get_row_group(0).unwrap(); -/// samples.read_from_row_group(&mut *row_group, 2).unwrap(); +/// samples.read_from_row_group(&mut *row_group, 1).unwrap(); /// samples /// } /// ``` @@ -183,6 +183,7 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke }; let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect(); + let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect(); let reader_snippets: Vec = field_infos.iter().map(|x| x.reader_snippet()).collect(); let i: Vec<_> = (0..reader_snippets.len()).collect(); @@ -191,17 +192,37 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let generics = input.generics; (quote! { + + use std::default; + + impl Default for #derived_for { + fn default() -> Self { + Self { + #( + #field_names: Default::default() + ),* + } + } + } + + impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> { fn read_from_row_group( &mut self, row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader, - max_records: usize, + num_records: usize, ) -> Result<(), ::parquet::errors::ParquetError> { use ::parquet::column::reader::ColumnReader; let mut row_group_reader = row_group_reader; + + for _ in 0..num_records { + self.push(#derived_for { + ..Default::default() + }) + } + let records = self; // Used by all the reader snippets to be more clear - let num_records = std::cmp::min(max_records, records.len()); #( { diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index c7cc1941efc8..f6e046ebad50 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -182,19 +182,7 @@ mod tests { byte_vec: vec![0x65, 0x66, 0x67], }]; - let mut out: Vec = vec![APartiallyCompleteRecord { - bool: false, - string: "a different string".into(), - i16: -450, - i32: 4560, - u64: 45634240, - isize: -3650, - float: 30.5, - double: 10., - now: chrono::Utc::now().naive_local(), - date: chrono::naive::NaiveDate::from_ymd_opt(1982, 1, 27).unwrap(), - byte_vec: vec![0x17, 0x18, 0x19], - }]; + let mut out: Vec = Vec::new(); use parquet::file::{ reader::FileReader, serialized_reader::SerializedFileReader, @@ -215,7 +203,7 @@ mod tests { let reader = SerializedFileReader::new(file).unwrap(); let mut row_group = reader.get_row_group(0).unwrap(); - out.read_from_row_group(&mut *row_group, 2).unwrap(); + out.read_from_row_group(&mut *row_group, 1).unwrap(); // correct for rounding error when writing milliseconds drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis( From 378785cef9f2e6f5bc79dbbc023fe2e5fd0104a1 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Thu, 5 Oct 2023 10:47:22 +0100 Subject: [PATCH 10/14] prevent conflicts with existing default implementation --- parquet_derive/src/lib.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 5932373aeb16..3f6fe1164d1f 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -193,19 +193,6 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { - use std::default; - - impl Default for #derived_for { - fn default() -> Self { - Self { - #( - #field_names: Default::default() - ),* - } - } - } - - impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> { fn read_from_row_group( &mut self, @@ -218,7 +205,9 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke for _ in 0..num_records { self.push(#derived_for { - ..Default::default() + #( + #field_names: Default::default() + ),* }) } From e5a0acce565786654aec01a8457b15aa561ff1bf Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Thu, 5 Oct 2023 10:55:38 +0100 Subject: [PATCH 11/14] update documentation --- parquet_derive/README.md | 4 ++-- parquet_derive/src/lib.rs | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/parquet_derive/README.md b/parquet_derive/README.md index f737ec1c851e..49867f04ecd2 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -101,8 +101,8 @@ struct ACompleteRecord { let reader = SerializedFileReader::new(file).unwrap(); let mut row_group = reader.get_row_group(0).unwrap(); -// create your records to read into -let mut chunks = vec![ACompleteRecord{ ... }]; +// create your records vector to read into +let mut chunks: Vec = Vec::new(); // The derived `RecordReader` takes over here chunks.read_from_row_group(&mut *row_group, 1).unwrap(); diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 3f6fe1164d1f..2701b7b9ce58 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -159,12 +159,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke /// } /// /// pub fn read_some_records() -> ACompleteRecord { -/// let mut samples = vec![ -/// ACompleteRecord { -/// a_bool: true, -/// a_string: String::from("I'm true"); -/// } -/// ]; +/// let mut samples: Vec = Vec::new(); /// /// let reader = SerializedFileReader::new(file).unwrap(); /// let mut row_group = reader.get_row_group(0).unwrap(); From 7cac70c922a8e0fef109517b42b57c86a953f1d6 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Fri, 20 Oct 2023 22:49:03 +0100 Subject: [PATCH 12/14] run cargo fmt --- parquet_derive/src/parquet_field.rs | 34 +++++++++++++++-------------- parquet_derive_test/src/lib.rs | 14 +++++------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 88be575f5c7b..18411e970463 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -875,22 +875,24 @@ mod test { let counter = Field::from(&fields[0]); let snippet = counter.reader_snippet().to_string(); - assert_eq!(snippet, - (quote!{ - { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut[i64] = vals_vec.as_mut_slice(); - if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; - } else { - panic!("Schema and struct disagree on type for {}", stringify!{ counter }); - } - for (i, r) in &mut records[..num_records].iter_mut().enumerate() { - r.counter = vals[i] as usize; - } - } - }).to_string() + assert_eq!( + snippet, + (quote! { + { + let mut vals_vec = Vec::new(); + vals_vec.resize(num_records, Default::default()); + let mut vals: &mut[i64] = vals_vec.as_mut_slice(); + if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { + typed.read_records(num_records, None, None, vals)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{ counter }); + } + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { + r.counter = vals[i] as usize; + } + } + }) + .to_string() ) } diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 21aa5e1d4fd5..964c766a13c6 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -183,16 +183,13 @@ mod tests { let mut out: Vec = Vec::new(); - use parquet::file::{ - reader::FileReader, serialized_reader::SerializedFileReader, - }; + use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader}; let generated_schema = drs.schema().unwrap(); let props = Default::default(); let mut writer = - SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props) - .unwrap(); + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); let mut row_group = writer.next_row_group().unwrap(); drs.write_to_row_group(&mut row_group).unwrap(); @@ -205,10 +202,9 @@ mod tests { out.read_from_row_group(&mut *row_group, 1).unwrap(); // correct for rounding error when writing milliseconds - drs[0].now = chrono::naive::NaiveDateTime::from_timestamp_millis( - drs[0].now.timestamp_millis(), - ) - .unwrap(); + drs[0].now = + chrono::naive::NaiveDateTime::from_timestamp_millis(drs[0].now.timestamp_millis()) + .unwrap(); assert!(out[0].double.is_nan()); // these three lines are necessary because NAN != NAN out[0].double = 0.; From 767947a2e5bdcf47a91f1b70b8ff51989fff71e3 Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Sun, 29 Oct 2023 18:19:34 +0000 Subject: [PATCH 13/14] change writer back to slice --- parquet/src/record/record_writer.rs | 2 +- parquet_derive/README.md | 10 +++++----- parquet_derive/src/lib.rs | 6 +++--- parquet_derive/src/parquet_field.rs | 2 ++ parquet_derive_test/src/lib.rs | 8 ++++---- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index a9cbbbe6ccec..0b2b95ef7dea 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -23,7 +23,7 @@ use super::super::file::writer::SerializedRowGroupWriter; /// `write_to_row_group` writes from `self` into `row_group_writer` /// `schema` builds the schema used by `row_group_writer` /// The type parameter `T` is used to work around the rust orphan rule -/// when implementing on types such as `Vec`. +/// when implementing on types such as `&[T]`. pub trait RecordWriter { fn write_to_row_group( &self, diff --git a/parquet_derive/README.md b/parquet_derive/README.md index 49867f04ecd2..984114186e37 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -71,7 +71,7 @@ let mut row_group = writer.next_row_group().unwrap(); let chunks = vec![ACompleteRecord{...}]; // The derived `RecordWriter` takes over here -chunks.write_to_row_group(&mut row_group); +(&chunks[..]).write_to_row_group(&mut row_group); writer.close_row_group(row_group).unwrap(); writer.close().unwrap(); @@ -113,16 +113,16 @@ chunks.read_from_row_group(&mut *row_group, 1).unwrap(); - [x] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` - [ ] Support writing dictionaries - [x] Support writing logical types like timestamp -- [x] Derive definition_levels for `Option` for writing -- [ ] Derive definition levels for nested structures for writing +- [x] Handle definition_levels for `Option` for writing +- [ ] Handle definition levels for nested structures for writing - [ ] Derive writing tuple struct - [ ] Derive writing `tuple` container types - [x] Support reading `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` - [ ] Support reading/writing dictionaries - [x] Support reading/writing logical types like timestamp -- [ ] Derive definition_levels for `Option` for reading -- [ ] Derive definition levels for nested structures for reading +- [ ] Handle definition_levels for `Option` for reading +- [ ] Handle definition levels for nested structures for reading - [ ] Derive reading/writing tuple struct - [ ] Derive reading/writing `tuple` container types diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 2701b7b9ce58..671a46db0f31 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -68,7 +68,7 @@ mod parquet_field; /// let mut writer = SerializedFileWriter::new(file, schema, Default::default()).unwrap(); /// /// let mut row_group = writer.next_row_group().unwrap(); -/// samples.write_to_row_group(&mut row_group).unwrap(); +/// samples.as_slice().write_to_row_group(&mut row_group).unwrap(); /// writer.close_row_group(row_group).unwrap(); /// writer.close().unwrap(); /// } @@ -95,7 +95,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke field_infos.iter().map(|x| x.parquet_type()).collect(); (quote! { - impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for Vec<#derived_for #generics> { + impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { fn write_to_row_group( &self, row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> @@ -158,7 +158,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke /// pub a_string: String, /// } /// -/// pub fn read_some_records() -> ACompleteRecord { +/// pub fn read_some_records() -> Vec { /// let mut samples: Vec = Vec::new(); /// /// let reader = SerializedFileReader::new(file).unwrap(); diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 18411e970463..0ac95c2864e5 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -385,6 +385,7 @@ impl Field { } } + // generates code to read `field_name` from each record into a vector `vals` fn copied_direct_vals(&self) -> proc_macro2::TokenStream { let field_name = &self.ident; @@ -417,6 +418,7 @@ impl Field { } } + // generates code to read a vector `records` into `field_name` for each record fn copied_direct_fields(&self) -> proc_macro2::TokenStream { let field_name = &self.ident; diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 964c766a13c6..a8b631ecc024 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -150,7 +150,7 @@ mod tests { borrowed_maybe_borrowed_byte_vec: &borrowed_maybe_borrowed_byte_vec, }]; - let generated_schema = drs.schema().unwrap(); + let generated_schema = drs.as_slice().schema().unwrap(); assert_eq!(&schema, &generated_schema); @@ -158,7 +158,7 @@ mod tests { let mut writer = SerializedFileWriter::new(file, generated_schema, props).unwrap(); let mut row_group = writer.next_row_group().unwrap(); - drs.write_to_row_group(&mut row_group).unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); row_group.close().unwrap(); writer.close().unwrap(); } @@ -185,14 +185,14 @@ mod tests { use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader}; - let generated_schema = drs.schema().unwrap(); + let generated_schema = drs.as_slice().schema().unwrap(); let props = Default::default(); let mut writer = SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); let mut row_group = writer.next_row_group().unwrap(); - drs.write_to_row_group(&mut row_group).unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); row_group.close().unwrap(); writer.close().unwrap(); From 96f71ee4106a4feea35c5f5ab8d6b5c9b190718f Mon Sep 17 00:00:00 2001 From: Joseph Rance Date: Sun, 29 Oct 2023 18:24:42 +0000 Subject: [PATCH 14/14] change 'Handle' back to 'Derive' for RecordWriter macro in readme --- parquet_derive/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_derive/README.md b/parquet_derive/README.md index 984114186e37..c267a92430e0 100644 --- a/parquet_derive/README.md +++ b/parquet_derive/README.md @@ -113,8 +113,8 @@ chunks.read_from_row_group(&mut *row_group, 1).unwrap(); - [x] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` - [ ] Support writing dictionaries - [x] Support writing logical types like timestamp -- [x] Handle definition_levels for `Option` for writing -- [ ] Handle definition levels for nested structures for writing +- [x] Derive definition_levels for `Option` for writing +- [ ] Derive definition levels for nested structures for writing - [ ] Derive writing tuple struct - [ ] Derive writing `tuple` container types