diff --git a/arrow-data/src/transform/boolean.rs b/arrow-data/src/transform/boolean.rs index d93fa15a4e0f..52480460e3ce 100644 --- a/arrow-data/src/transform/boolean.rs +++ b/arrow-data/src/transform/boolean.rs @@ -15,10 +15,15 @@ // specific language governing permissions and limitations // under the License. -use super::{Extend, _MutableArrayData, utils::resize_for_bits}; +use super::{Extend, _MutableArrayData, utils::resize_for_bits, ExtendNullBits, SpecializedMutableArrayData}; use crate::bit_mask::set_bits; -use crate::ArrayData; +use crate::transform::utils::build_extend_null_bits; +use crate::{ArrayData, ArrayDataBuilder}; +use arrow_buffer::{bit_util, BooleanBuffer, MutableBuffer, NullBuffer}; +use arrow_schema::DataType::Boolean; +// TODO - remove +#[deprecated] pub(super) fn build_extend(array: &ArrayData) -> Extend { let values = array.buffers()[0].as_slice(); Box::new( @@ -36,7 +41,230 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { ) } +// TODO - remove +#[deprecated] pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let buffer = &mut mutable.buffer1; resize_for_bits(buffer, mutable.len + len); } + +/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by +/// copying chunks. +/// +/// The main use case of this struct is to perform unary operations to arrays of +/// arbitrary types, such as `filter` and `take`. +/// +/// # Example +/// ``` +/// use arrow_buffer::Buffer; +/// use arrow_data::ArrayData; +/// use arrow_data::transform::{MutableArrayData, SpecializedMutableArrayData}; +/// use arrow_schema::DataType; +/// fn i32_array(values: &[i32]) -> ArrayData { +/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() +/// } +/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); +/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); +/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements +/// let capacity = 3 * std::mem::size_of::(); +/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); +/// // Copy the first 3 elements from arr1 +/// mutable.extend(0, 0, 3); +/// // Copy the last 3 elements from arr2 +/// mutable.extend(1, 2, 4); +/// // Complete the MutableArrayData into a new ArrayData +/// let frozen = mutable.freeze(); +/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); +/// ``` +pub struct BooleanMutableArrayData<'a> { + /// Input arrays: the data being read FROM. + arrays: Vec<&'a ArrayData>, + + /// In progress output array: The data being written TO + /// + /// Note these fields are in a separate struct, [crate::transform::_MutableArrayData], as they + /// cannot be in [crate::transform::MutableArrayData] itself due to mutability invariants (interior + /// mutability): [crate::transform::MutableArrayData] contains a function that can only mutate + /// [crate::transform::_MutableArrayData], not [crate::transform::MutableArrayData] itself + data: _MutableArrayData<'a>, + + /// function used to extend the output array with nulls from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// nulls from it. + extend_null_bits: Vec>, + +} + +impl<'a> BooleanMutableArrayData { + // function that extends `[start..start+len]` to the mutable array. + fn extend_values(&mut self, array_index: usize, start: usize, len: usize) { + let array = self.arrays[array_index]; + let values = array.buffers()[0].as_slice(); + + let buffer = &mut self.data.buffer1; + resize_for_bits(buffer, self.data.len + len); + set_bits( + buffer.as_slice_mut(), + values, + self.data.len, + array.offset() + start, + len, + ); + } +} + +impl<'a> std::fmt::Debug for BooleanMutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("BooleanMutableArrayData") + .field("data", &self.data) + .finish() + } +} + + +impl<'a> SpecializedMutableArrayData<'a> for BooleanMutableArrayData<'a> { + /// Returns a new [crate::transform::MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [crate::transform::MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [crate::transform::MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + + // TODO - instead change array to be of specific type like Boolean array + for a in arrays.iter() { + assert_eq!( + &Boolean, + a.data_type(), + // TODO - update error message + "Arrays with inconsistent types passed to MutableArrayData" + ) + } + + // if any of the arrays has nulls, insertions from any array requires setting bits + // as there is at least one array with nulls. + let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0); + + let array_capacity = capacity; + let [buffer1, buffer2] = { + let bytes = bit_util::ceil(capacity, 8); + let buffer = MutableBuffer::new(bytes); + [buffer, MutableBuffer::new(0)] + }; + + // TODO - is needed? + let child_data = vec![]; + + let extend_null_bits = arrays + .iter() + .map(|array| build_extend_null_bits(array, use_nulls)) + .collect(); + + let null_buffer = use_nulls.then(|| { + let null_bytes = bit_util::ceil(array_capacity, 8); + MutableBuffer::from_len_zeroed(null_bytes) + }); + + let data = _MutableArrayData { + data_type: Boolean, + len: 0, + null_count: 0, + null_buffer, + buffer1, + buffer2, + child_data, + }; + + Self { + arrays, + data, + extend_null_bits, + } + } + + /// Extends the in progress array with a region of the input arrays + /// + /// # Arguments + /// * `index` - the index of array that you what to copy values from + /// * `start` - the start index of the chunk (inclusive) + /// * `end` - the end index of the chunk (exclusive) + /// + /// # Panic + /// This function panics if there is an invalid index, + /// i.e. `index` >= the number of source arrays + /// or `end` > the length of the `index`th array + fn extend(&mut self, index: usize, start: usize, end: usize) { + let len = end - start; + (self.extend_null_bits[index])(&mut self.data, start, len); + self.extend_values(index, start, len); + self.data.len += len; + } + + /// Extends the in progress array with null elements, ignoring the input arrays. + /// + /// # Panics + /// + /// Panics if [`crate::transform::MutableArrayData`] not created with `use_nulls` or nullable source arrays + fn extend_nulls(&mut self, len: usize) { + self.data.len += len; + let bit_len = bit_util::ceil(self.data.len, 8); + let nulls = self.data.null_buffer(); + nulls.resize(bit_len, 0); + self.data.null_count += len; + + let buffer = &mut self.data.buffer1; + resize_for_bits(buffer, self.data.len + len); + } + + /// Returns the current length + #[inline] + fn len(&self) -> usize { + self.data.len + } + + /// Returns true if len is 0 + #[inline] + fn is_empty(&self) -> bool { + self.data.len == 0 + } + + /// Returns the current null count + #[inline] + fn null_count(&self) -> usize { + self.data.null_count + } + + /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. + /// + /// This is useful for extending the default behavior of MutableArrayData. + fn into_builder(self) -> ArrayDataBuilder { + let data = self.data; + + let buffers = vec![data.buffer1.into()]; + + let child_data = data.child_data.into_iter().map(|x| x.freeze()).collect(); + + let nulls = data + .null_buffer + .map(|nulls| { + let bools = BooleanBuffer::new(nulls.into(), 0, data.len); + unsafe { NullBuffer::new_unchecked(bools, data.null_count) } + }) + .filter(|n| n.null_count() > 0); + + ArrayDataBuilder::new(data.data_type) + .offset(0) + .len(data.len) + .nulls(nulls) + .buffers(buffers) + .child_data(child_data) + } +} diff --git a/arrow-data/src/transform/fixed_binary.rs b/arrow-data/src/transform/fixed_binary.rs index 44c6f46ebf7e..c14c911e161e 100644 --- a/arrow-data/src/transform/fixed_binary.rs +++ b/arrow-data/src/transform/fixed_binary.rs @@ -15,10 +15,16 @@ // specific language governing permissions and limitations // under the License. -use super::{Extend, _MutableArrayData}; -use crate::ArrayData; +use super::{Extend, ExtendNullBits, SpecializedMutableArrayData, _MutableArrayData}; +use crate::data::new_buffers; +use crate::transform::utils::build_extend_null_bits; +use crate::{ArrayData, ArrayDataBuilder}; +use arrow_buffer::{bit_util, BooleanBuffer, MutableBuffer, NullBuffer}; use arrow_schema::DataType; + +// TODO - remove +#[deprecated] pub(super) fn build_extend(array: &ArrayData) -> Extend { let size = match array.data_type() { DataType::FixedSizeBinary(i) => *i as usize, @@ -34,6 +40,8 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { ) } +// TODO - remove +#[deprecated] pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let size = match mutable.data_type { DataType::FixedSizeBinary(i) => i as usize, @@ -43,3 +51,250 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let values_buffer = &mut mutable.buffer1; values_buffer.extend_zeros(len * size); } + + + +/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by +/// copying chunks. +/// +/// The main use case of this struct is to perform unary operations to arrays of +/// arbitrary types, such as `filter` and `take`. +/// +/// # Example +/// ``` +/// use arrow_buffer::Buffer; +/// use arrow_data::ArrayData; +/// use arrow_data::transform::{MutableArrayData, SpecializedMutableArrayData}; +/// use arrow_schema::DataType; +/// fn i32_array(values: &[i32]) -> ArrayData { +/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() +/// } +/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); +/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); +/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements +/// let capacity = 3 * std::mem::size_of::(); +/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); +/// // Copy the first 3 elements from arr1 +/// mutable.extend(0, 0, 3); +/// // Copy the last 3 elements from arr2 +/// mutable.extend(1, 2, 4); +/// // Complete the MutableArrayData into a new ArrayData +/// let frozen = mutable.freeze(); +/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); +/// ``` +pub struct FixedSizeBinaryMutableArrayData<'a> { + /// Input arrays: the data being read FROM. + /// + /// Note this is "dead code" because all actual references to the arrays are + /// stored in closures for extending values and nulls. + #[allow(dead_code)] + arrays: Vec<&'a ArrayData>, + + /// In progress output array: The data being written TO + /// + /// Note these fields are in a separate struct, [_MutableArrayData], as they + /// cannot be in [crate::transform::MutableArrayData] itself due to mutability invariants (interior + /// mutability): [crate::transform::MutableArrayData] contains a function that can only mutate + /// [_MutableArrayData], not [crate::transform::MutableArrayData] itself + data: _MutableArrayData<'a>, + + /// function used to extend the output array with nulls from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// nulls from it. + extend_null_bits: Vec>, +} + +impl<'a> std::fmt::Debug for FixedSizeBinaryMutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("FixedSizeBinaryMutableArrayData") + .field("data", &self.data) + .finish() + } +} + + +impl<'a> FixedSizeBinaryMutableArrayData<'a> { + // function that extends `[start..start+len]` to the mutable array. + fn extend_values(&mut self, array_index: usize, start: usize, len: usize) { + let array = self.arrays[array_index]; + + // TODO - can we avoid this by saving the size in the struct? as we know all data types are the same + let size = match array.data_type() { + DataType::FixedSizeBinary(i) => *i as usize, + _ => unreachable!(), + }; + + + let values = &array.buffers()[0].as_slice()[array.offset() * size..]; + let buffer = &mut self.data.buffer1; + buffer.extend_from_slice(&values[start * size..(start + len) * size]); + } + + /// Similar to [crate::transform::MutableArrayData::new], but lets users define the + /// preallocated capacities of the array with more granularity. + /// + /// See [crate::transform::MutableArrayData::new] for more information on the arguments. + /// + /// # Panics + /// + /// This function panics if the given `capacities` don't match the data type + /// of `arrays`. Or when a [crate::transform::Capacities] variant is not yet supported. + pub fn with_capacities( + arrays: Vec<&'a ArrayData>, + use_nulls: bool, + capacity: usize, + ) -> Self { + // TODO - assert fixed size binary + let data_type = arrays[0].data_type(); + + for a in arrays.iter() { + assert_eq!( + data_type, + a.data_type(), + "Arrays with inconsistent types passed to FixedBinaryMutableArrayData" + ) + } + + + // if any of the arrays has nulls, insertions from any array requires setting bits + // as there is at least one array with nulls. + let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0); + + let mut array_capacity; + + let [buffer1, buffer2] = { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + }; + + let extend_null_bits = arrays + .iter() + .map(|array| build_extend_null_bits(array, use_nulls)) + .collect(); + + let null_buffer = use_nulls.then(|| { + let null_bytes = bit_util::ceil(array_capacity, 8); + MutableBuffer::from_len_zeroed(null_bytes) + }); + + let data = _MutableArrayData { + data_type: data_type.clone(), + len: 0, + null_count: 0, + null_buffer, + buffer1, + buffer2, + child_data: vec![], + }; + Self { + arrays, + data, + extend_null_bits, + } + } +} + +impl<'a> SpecializedMutableArrayData<'a> for FixedSizeBinaryMutableArrayData<'a> { + /// Returns a new [crate::transform::MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [crate::transform::MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [crate::transform::MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + Self::with_capacities(arrays, use_nulls, capacity) + } + + /// Extends the in progress array with a region of the input arrays + /// + /// # Arguments + /// * `index` - the index of array that you what to copy values from + /// * `start` - the start index of the chunk (inclusive) + /// * `end` - the end index of the chunk (exclusive) + /// + /// # Panic + /// This function panics if there is an invalid index, + /// i.e. `index` >= the number of source arrays + /// or `end` > the length of the `index`th array + fn extend(&mut self, index: usize, start: usize, end: usize) { + let len = end - start; + (self.extend_null_bits[index])(&mut self.data, start, len); + self.extend_values(index, start, len); + self.data.len += len; + } + + /// Extends the in progress array with null elements, ignoring the input arrays. + /// + /// # Panics + /// + /// Panics if [`crate::transform::MutableArrayData`] not created with `use_nulls` or nullable source arrays + fn extend_nulls(&mut self, len: usize) { + self.data.len += len; + let bit_len = bit_util::ceil(self.data.len, 8); + let nulls = self.data.null_buffer(); + nulls.resize(bit_len, 0); + self.data.null_count += len; + + // TODO - can we avoid this by saving the size in the struct? as we know all data types are the same + let size = match self.data.data_type { + DataType::FixedSizeBinary(i) => i as usize, + _ => unreachable!(), + }; + + let values_buffer = &mut self.data.buffer1; + values_buffer.extend_zeros(len * size); + } + + /// Returns the current length + #[inline] + fn len(&self) -> usize { + self.data.len + } + + /// Returns true if len is 0 + #[inline] + fn is_empty(&self) -> bool { + self.data.len == 0 + } + + /// Returns the current null count + #[inline] + fn null_count(&self) -> usize { + self.data.null_count + } + + /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. + /// + /// This is useful for extending the default behavior of MutableArrayData. + fn into_builder(self) -> ArrayDataBuilder { + let data = self.data; + + let buffers = vec![]; + + let child_data = data.child_data.into_iter().map(|x| x.freeze()).collect(); + + let nulls = data + .null_buffer + .map(|nulls| { + let bools = BooleanBuffer::new(nulls.into(), 0, data.len); + unsafe { NullBuffer::new_unchecked(bools, data.null_count) } + }) + .filter(|n| n.null_count() > 0); + + ArrayDataBuilder::new(data.data_type) + .offset(0) + .len(data.len) + .nulls(nulls) + .buffers(buffers) + .child_data(child_data) + } +} + diff --git a/arrow-data/src/transform/fixed_size_list.rs b/arrow-data/src/transform/fixed_size_list.rs index 8eef7bce9bb3..2b59a466f3a3 100644 --- a/arrow-data/src/transform/fixed_size_list.rs +++ b/arrow-data/src/transform/fixed_size_list.rs @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::ArrayData; -use arrow_schema::DataType; - -use super::{Extend, _MutableArrayData}; +use arrow_buffer::{bit_util, BooleanBuffer, Buffer, MutableBuffer, NullBuffer}; +use crate::{ArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType, UnionMode}; +use crate::data::new_buffers; +use crate::transform::utils::build_extend_null_bits; +use super::{build_extend_dictionary, build_extend_nulls, build_extend_view, Capacities, Extend, ExtendNullBits, ExtendNulls, _MutableArrayData, preallocate_offset_and_binary_buffer, SpecializedMutableArrayData}; +// TODO - remove +#[deprecated] pub(super) fn build_extend(array: &ArrayData) -> Extend { let size = match array.data_type() { DataType::FixedSizeList(_, i) => *i as usize, @@ -36,6 +40,8 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { ) } +// TODO - remove +#[deprecated] pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let size = match mutable.data_type { DataType::FixedSizeList(_, i) => i as usize, @@ -47,3 +53,276 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { .iter_mut() .for_each(|child| child.extend_nulls(len * size)) } + + + +/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by +/// copying chunks. +/// +/// The main use case of this struct is to perform unary operations to arrays of +/// arbitrary types, such as `filter` and `take`. +/// +/// # Example +/// ``` +/// use arrow_buffer::Buffer; +/// use arrow_data::ArrayData; +/// use arrow_data::transform::{MutableArrayData, SpecializedMutableArrayData}; +/// use arrow_schema::DataType; +/// fn i32_array(values: &[i32]) -> ArrayData { +/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() +/// } +/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); +/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); +/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements +/// let capacity = 3 * std::mem::size_of::(); +/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); +/// // Copy the first 3 elements from arr1 +/// mutable.extend(0, 0, 3); +/// // Copy the last 3 elements from arr2 +/// mutable.extend(1, 2, 4); +/// // Complete the MutableArrayData into a new ArrayData +/// let frozen = mutable.freeze(); +/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); +/// ``` +pub struct FixedSizeListMutableArrayData<'a> { + // TODO - can have specific array instead? + /// Input arrays: the data being read FROM. + /// + /// Note this is "dead code" because all actual references to the arrays are + /// stored in closures for extending values and nulls. + #[allow(dead_code)] + arrays: Vec<&'a ArrayData>, + + /// In progress output array: The data being written TO + /// + /// Note these fields are in a separate struct, [_MutableArrayData], as they + /// cannot be in [crate::transform::MutableArrayData] itself due to mutability invariants (interior + /// mutability): [crate::transform::MutableArrayData] contains a function that can only mutate + /// [_MutableArrayData], not [crate::transform::MutableArrayData] itself + data: _MutableArrayData<'a>, + + /// function used to extend the output array with nulls from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// nulls from it. + extend_null_bits: Vec>, +} + +impl<'a> std::fmt::Debug for FixedSizeListMutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("FixedSizeListMutableArrayData") + .field("data", &self.data) + .finish() + } +} + +impl<'a> FixedSizeListMutableArrayData<'a> { + + // function that extends `[start..start+len]` to the mutable array. + fn extend_values(&mut self, array_index: usize, start: usize, len: usize) { + // TODO - can we avoid this by saving the size in the struct? as we know all data types are the same + let size = match self.arrays[array_index].data_type() { + DataType::FixedSizeList(_, i) => *i as usize, + _ => unreachable!(), + }; + + self.data + .child_data + .iter_mut() + .for_each(|child| child.extend(array_index, start * size, (start + len) * size)) + } + + + /// Similar to [crate::transform::MutableArrayData::new], but lets users define the + /// preallocated capacities of the array with more granularity. + /// + /// See [crate::transform::MutableArrayData::new] for more information on the arguments. + /// + /// # Panics + /// + /// This function panics if the given `capacities` don't match the data type + /// of `arrays`. Or when a [crate::transform::Capacities] variant is not yet supported. + pub fn with_capacities( + arrays: Vec<&'a ArrayData>, + use_nulls: bool, + capacities: Capacities, + ) -> Self { + let data_type = arrays[0].data_type(); + + for a in arrays.iter().skip(1) { + assert_eq!( + data_type, + a.data_type(), + "Arrays with inconsistent types passed to MutableArrayData" + ) + } + + // if any of the arrays has nulls, insertions from any array requires setting bits + // as there is at least one array with nulls. + let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0); + + let mut array_capacity; + + let [buffer1, buffer2] = match &capacities { + Capacities::Array(capacity) => { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + } + Capacities::List(capacity, _) => { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + } + _ => panic!("Capacities: {capacities:?} not supported"), + }; + + let child_data = match data_type { + DataType::FixedSizeList(_, size) => { + let children = arrays + .iter() + .map(|array| &array.child_data()[0]) + .collect::>(); + let capacities = + if let Capacities::List(capacity, ref child_capacities) = capacities { + child_capacities + .clone() + .map(|c| *c) + .unwrap_or(Capacities::Array(capacity * *size as usize)) + } else { + Capacities::Array(array_capacity * *size as usize) + }; + vec![crate::transform::MutableArrayData::with_capacities( + children, use_nulls, capacities, + )] + } + _ => unreachable!(), + }; + + let extend_null_bits = arrays + .iter() + .map(|array| build_extend_null_bits(array, use_nulls)) + .collect(); + + let null_buffer = use_nulls.then(|| { + let null_bytes = bit_util::ceil(array_capacity, 8); + MutableBuffer::from_len_zeroed(null_bytes) + }); + + let data = _MutableArrayData { + data_type: data_type.clone(), + len: 0, + null_count: 0, + null_buffer, + buffer1, + buffer2, + child_data, + }; + Self { + arrays, + data, + extend_null_bits, + } + } +} + +impl<'a> SpecializedMutableArrayData<'a> for FixedSizeListMutableArrayData<'a> { + /// Returns a new [crate::transform::MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [crate::transform::MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [crate::transform::MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + Self::with_capacities(arrays, use_nulls, crate::transform::Capacities::Array(capacity)) + } + + /// Extends the in progress array with a region of the input arrays + /// + /// # Arguments + /// * `index` - the index of array that you what to copy values from + /// * `start` - the start index of the chunk (inclusive) + /// * `end` - the end index of the chunk (exclusive) + /// + /// # Panic + /// This function panics if there is an invalid index, + /// i.e. `index` >= the number of source arrays + /// or `end` > the length of the `index`th array + fn extend(&mut self, index: usize, start: usize, end: usize) { + let len = end - start; + (self.extend_null_bits[index])(&mut self.data, start, len); + self.extend_values(index, start, len); + self.data.len += len; + } + + /// Extends the in progress array with null elements, ignoring the input arrays. + /// + /// # Panics + /// + /// Panics if [`crate::transform::MutableArrayData`] not created with `use_nulls` or nullable source arrays + fn extend_nulls(&mut self, len: usize) { + self.data.len += len; + let bit_len = bit_util::ceil(self.data.len, 8); + let nulls = self.data.null_buffer(); + nulls.resize(bit_len, 0); + self.data.null_count += len; + let size = match self.data.data_type { + DataType::FixedSizeList(_, i) => i as usize, + _ => unreachable!(), + }; + + self.data + .child_data + .iter_mut() + .for_each(|child| child.extend_nulls(len * size)) + } + + /// Returns the current length + #[inline] + fn len(&self) -> usize { + self.data.len + } + + /// Returns true if len is 0 + #[inline] + fn is_empty(&self) -> bool { + self.data.len == 0 + } + + /// Returns the current null count + #[inline] + fn null_count(&self) -> usize { + self.data.null_count + } + + /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. + /// + /// This is useful for extending the default behavior of MutableArrayData. + fn into_builder(self) -> ArrayDataBuilder { + let data = self.data; + + let buffers = vec![]; + + let child_data = data.child_data.into_iter().map(|x| x.freeze()).collect(); + + let nulls = data + .null_buffer + .map(|nulls| { + let bools = BooleanBuffer::new(nulls.into(), 0, data.len); + unsafe { NullBuffer::new_unchecked(bools, data.null_count) } + }) + .filter(|n| n.null_count() > 0); + + ArrayDataBuilder::new(data.data_type) + .offset(0) + .len(data.len) + .nulls(nulls) + .buffers(buffers) + .child_data(child_data) + } +} diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index edc68f1a107f..9c9e4b5eb679 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -23,6 +23,7 @@ use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode}; use half::f16; use num::Integer; use std::mem; +use crate::transform::utils::{_MutableArrayData, build_extend_null_bits}; mod boolean; mod fixed_binary; @@ -34,6 +35,12 @@ mod structure; mod union; mod utils; mod variable_size; +mod traits; + +pub use traits::SpecializedMutableArrayData; +pub use boolean::BooleanMutableArrayData; +pub use fixed_binary::FixedSizeBinaryMutableArrayData; +pub use fixed_size_list::FixedSizeListMutableArrayData; type ExtendNullBits<'a> = Box; // function that extends `[start..start+len]` to the mutable array. @@ -42,145 +49,6 @@ type Extend<'a> = Box; type ExtendNulls = Box; -/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData]. -/// This is just a data container. -#[derive(Debug)] -struct _MutableArrayData<'a> { - pub data_type: DataType, - pub null_count: usize, - - pub len: usize, - pub null_buffer: Option, - - // arrow specification only allows up to 3 buffers (2 ignoring the nulls above). - // Thus, we place them in the stack to avoid bound checks and greater data locality. - pub buffer1: MutableBuffer, - pub buffer2: MutableBuffer, - pub child_data: Vec>, -} - -impl<'a> _MutableArrayData<'a> { - fn null_buffer(&mut self) -> &mut MutableBuffer { - self.null_buffer - .as_mut() - .expect("MutableArrayData not nullable") - } -} - -fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits { - if let Some(nulls) = array.nulls() { - let bytes = nulls.validity(); - Box::new(move |mutable, start, len| { - let mutable_len = mutable.len; - let out = mutable.null_buffer(); - utils::resize_for_bits(out, mutable_len + len); - mutable.null_count += set_bits( - out.as_slice_mut(), - bytes, - mutable_len, - nulls.offset() + start, - len, - ); - }) - } else if use_nulls { - Box::new(|mutable, _, len| { - let mutable_len = mutable.len; - let out = mutable.null_buffer(); - utils::resize_for_bits(out, mutable_len + len); - let write_data = out.as_slice_mut(); - (0..len).for_each(|i| { - bit_util::set_bit(write_data, mutable_len + i); - }); - }) - } else { - Box::new(|_, _, _| {}) - } -} - -/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by -/// copying chunks. -/// -/// The main use case of this struct is to perform unary operations to arrays of -/// arbitrary types, such as `filter` and `take`. -/// -/// # Example -/// ``` -/// use arrow_buffer::Buffer; -/// use arrow_data::ArrayData; -/// use arrow_data::transform::MutableArrayData; -/// use arrow_schema::DataType; -/// fn i32_array(values: &[i32]) -> ArrayData { -/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() -/// } -/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); -/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); -/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements -/// let capacity = 3 * std::mem::size_of::(); -/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); -/// // Copy the first 3 elements from arr1 -/// mutable.extend(0, 0, 3); -/// // Copy the last 3 elements from arr2 -/// mutable.extend(1, 2, 4); -/// // Complete the MutableArrayData into a new ArrayData -/// let frozen = mutable.freeze(); -/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); -/// ``` -pub struct MutableArrayData<'a> { - /// Input arrays: the data being read FROM. - /// - /// Note this is "dead code" because all actual references to the arrays are - /// stored in closures for extending values and nulls. - #[allow(dead_code)] - arrays: Vec<&'a ArrayData>, - - /// In progress output array: The data being written TO - /// - /// Note these fields are in a separate struct, [_MutableArrayData], as they - /// cannot be in [MutableArrayData] itself due to mutability invariants (interior - /// mutability): [MutableArrayData] contains a function that can only mutate - /// [_MutableArrayData], not [MutableArrayData] itself - data: _MutableArrayData<'a>, - - /// The child data of the `Array` in Dictionary arrays. - /// - /// This is not stored in `_MutableArrayData` because these values are - /// constant and only needed at the end, when freezing [_MutableArrayData]. - dictionary: Option, - - /// Variadic data buffers referenced by views. - /// - /// Note this this is not stored in `_MutableArrayData` because these values - /// are constant and only needed at the end, when freezing - /// [_MutableArrayData] - variadic_data_buffers: Vec, - - /// function used to extend output array with values from input arrays. - /// - /// This function's lifetime is bound to the input arrays because it reads - /// values from them. - extend_values: Vec>, - - /// function used to extend the output array with nulls from input arrays. - /// - /// This function's lifetime is bound to the input arrays because it reads - /// nulls from it. - extend_null_bits: Vec>, - - /// function used to extend the output array with null elements. - /// - /// This function is independent of the arrays and therefore has no lifetime. - extend_nulls: ExtendNulls, -} - -impl<'a> std::fmt::Debug for MutableArrayData<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - // ignores the closures. - f.debug_struct("MutableArrayData") - .field("data", &self.data) - .finish() - } -} - /// Builds an extend that adds `offset` to the source primitive /// Additionally validates that `max` fits into the /// the underlying primitive returning None if not @@ -345,6 +213,93 @@ fn preallocate_offset_and_binary_buffer( ] } + +/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by +/// copying chunks. +/// +/// The main use case of this struct is to perform unary operations to arrays of +/// arbitrary types, such as `filter` and `take`. +/// +/// # Example +/// ``` +/// use arrow_buffer::Buffer; +/// use arrow_data::ArrayData; +/// use arrow_data::transform::{MutableArrayData, SpecializedMutableArrayData}; +/// use arrow_schema::DataType; +/// fn i32_array(values: &[i32]) -> ArrayData { +/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap() +/// } +/// let arr1 = i32_array(&[1, 2, 3, 4, 5]); +/// let arr2 = i32_array(&[6, 7, 8, 9, 10]); +/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements +/// let capacity = 3 * std::mem::size_of::(); +/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10); +/// // Copy the first 3 elements from arr1 +/// mutable.extend(0, 0, 3); +/// // Copy the last 3 elements from arr2 +/// mutable.extend(1, 2, 4); +/// // Complete the MutableArrayData into a new ArrayData +/// let frozen = mutable.freeze(); +/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10])); +/// ``` +pub struct MutableArrayData<'a> { + // TODO - can have specific array instead? + /// Input arrays: the data being read FROM. + /// + /// Note this is "dead code" because all actual references to the arrays are + /// stored in closures for extending values and nulls. + #[allow(dead_code)] + arrays: Vec<&'a ArrayData>, + + /// In progress output array: The data being written TO + /// + /// Note these fields are in a separate struct, [_MutableArrayData], as they + /// cannot be in [MutableArrayData] itself due to mutability invariants (interior + /// mutability): [MutableArrayData] contains a function that can only mutate + /// [_MutableArrayData], not [MutableArrayData] itself + data: _MutableArrayData<'a>, + + /// The child data of the `Array` in Dictionary arrays. + /// + /// This is not stored in `_MutableArrayData` because these values are + /// constant and only needed at the end, when freezing [_MutableArrayData]. + dictionary: Option, + + /// Variadic data buffers referenced by views. + /// + /// Note this this is not stored in `_MutableArrayData` because these values + /// are constant and only needed at the end, when freezing + /// [_MutableArrayData] + variadic_data_buffers: Vec, + + /// function used to extend output array with values from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// values from them. + extend_values: Vec>, + + /// function used to extend the output array with nulls from input arrays. + /// + /// This function's lifetime is bound to the input arrays because it reads + /// nulls from it. + extend_null_bits: Vec>, + + /// function used to extend the output array with null elements. + /// + /// This function is independent of the arrays and therefore has no lifetime. + extend_nulls: ExtendNulls, +} + +impl<'a> std::fmt::Debug for MutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("MutableArrayData") + .field("data", &self.data) + .finish() + } +} + + /// Define capacities to pre-allocate for child data or data buffers. #[derive(Debug, Clone)] pub enum Capacities { @@ -377,22 +332,6 @@ pub enum Capacities { } impl<'a> MutableArrayData<'a> { - /// Returns a new [MutableArrayData] with capacity to `capacity` slots and - /// specialized to create an [ArrayData] from multiple `arrays`. - /// - /// # Arguments - /// * `arrays` - the source arrays to copy from - /// * `use_nulls` - a flag used to optimize insertions - /// - `false` if the only source of nulls are the arrays themselves - /// - `true` if the user plans to call [MutableArrayData::extend_nulls]. - /// * capacity - the preallocated capacity of the output array, in bytes - /// - /// Thus, if `use_nulls` is `false`, calling - /// [MutableArrayData::extend_nulls] should not be used. - pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { - Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity)) - } - /// Similar to [MutableArrayData::new], but lets users define the /// preallocated capacities of the array with more granularity. /// @@ -699,6 +638,24 @@ impl<'a> MutableArrayData<'a> { extend_nulls, } } +} + +impl<'a> SpecializedMutableArrayData<'a> for MutableArrayData<'a> { + /// Returns a new [MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity)) + } /// Extends the in progress array with a region of the input arrays /// @@ -711,7 +668,7 @@ impl<'a> MutableArrayData<'a> { /// This function panics if there is an invalid index, /// i.e. `index` >= the number of source arrays /// or `end` > the length of the `index`th array - pub fn extend(&mut self, index: usize, start: usize, end: usize) { + fn extend(&mut self, index: usize, start: usize, end: usize) { let len = end - start; (self.extend_null_bits[index])(&mut self.data, start, len); (self.extend_values[index])(&mut self.data, index, start, len); @@ -723,7 +680,7 @@ impl<'a> MutableArrayData<'a> { /// # Panics /// /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays - pub fn extend_nulls(&mut self, len: usize) { + fn extend_nulls(&mut self, len: usize) { self.data.len += len; let bit_len = bit_util::ceil(self.data.len, 8); let nulls = self.data.null_buffer(); @@ -734,31 +691,26 @@ impl<'a> MutableArrayData<'a> { /// Returns the current length #[inline] - pub fn len(&self) -> usize { + fn len(&self) -> usize { self.data.len } /// Returns true if len is 0 #[inline] - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { self.data.len == 0 } /// Returns the current null count #[inline] - pub fn null_count(&self) -> usize { + fn null_count(&self) -> usize { self.data.null_count } - /// Creates a [ArrayData] from the in progress array, consuming `self`. - pub fn freeze(self) -> ArrayData { - unsafe { self.into_builder().build_unchecked() } - } - /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. /// /// This is useful for extending the default behavior of MutableArrayData. - pub fn into_builder(self) -> ArrayDataBuilder { + fn into_builder(self) -> ArrayDataBuilder { let data = self.data; let buffers = match data.data_type { diff --git a/arrow-data/src/transform/traits.rs b/arrow-data/src/transform/traits.rs new file mode 100644 index 000000000000..2a3a16a06603 --- /dev/null +++ b/arrow-data/src/transform/traits.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{ArrayData, ArrayDataBuilder, MutableArrayData}; +use std::fmt::Debug; + +// TODO - ADD COMMENT and update comments +pub trait SpecializedMutableArrayData<'a>: Debug { + /// Returns a new [MutableArrayData] with capacity to `capacity` slots and + /// specialized to create an [ArrayData] from multiple `arrays`. + /// + /// # Arguments + /// * `arrays` - the source arrays to copy from + /// * `use_nulls` - a flag used to optimize insertions + /// - `false` if the only source of nulls are the arrays themselves + /// - `true` if the user plans to call [MutableArrayData::extend_nulls]. + /// * capacity - the preallocated capacity of the output array, in bytes + /// + /// Thus, if `use_nulls` is `false`, calling + /// [MutableArrayData::extend_nulls] should not be used. + fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self; + + /// Extends the in progress array with a region of the input arrays + /// + /// # Arguments + /// * `index` - the index of array that you what to copy values from + /// * `start` - the start index of the chunk (inclusive) + /// * `end` - the end index of the chunk (exclusive) + /// + /// # Panic + /// This function panics if there is an invalid index, + /// i.e. `index` >= the number of source arrays + /// or `end` > the length of the `index`th array + fn extend(&mut self, index: usize, start: usize, end: usize); + + /// Extends the in progress array with null elements, ignoring the input arrays. + /// + /// # Panics + /// + /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays + fn extend_nulls(&mut self, len: usize); + + /// Returns the current length + fn len(&self) -> usize; + + /// Returns true if len is 0 + fn is_empty(&self) -> bool; + + /// Returns the current null count + fn null_count(&self) -> usize; + + /// Creates a [ArrayData] from the in progress array, consuming `self`. + fn freeze(self) -> ArrayData { + unsafe { self.into_builder().build_unchecked() } + } + + /// Consume self and returns the in progress array as [`ArrayDataBuilder`]. + /// + /// This is useful for extending the default behavior of MutableArrayData. + fn into_builder(self) -> ArrayDataBuilder; +} diff --git a/arrow-data/src/transform/utils.rs b/arrow-data/src/transform/utils.rs index 5407f68e0d0c..f6c17b49c984 100644 --- a/arrow-data/src/transform/utils.rs +++ b/arrow-data/src/transform/utils.rs @@ -17,6 +17,10 @@ use arrow_buffer::{bit_util, ArrowNativeType, MutableBuffer}; use num::{CheckedAdd, Integer}; +use arrow_buffer::bit_mask::set_bits; +use arrow_schema::DataType; +use crate::ArrayData; +use crate::transform::{utils, ExtendNullBits, MutableArrayData}; /// extends the `buffer` to be able to hold `len` bits, setting all bits of the new size to zero. #[inline] @@ -57,6 +61,62 @@ pub(super) unsafe fn get_last_offset(offset_buffer: &Mutable *offsets.get_unchecked(offsets.len() - 1) } + +/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData]. +/// This is just a data container. +#[derive(Debug)] +pub(super) struct _MutableArrayData<'a> { + pub data_type: DataType, + pub null_count: usize, + + pub len: usize, + pub null_buffer: Option, + + // arrow specification only allows up to 3 buffers (2 ignoring the nulls above). + // Thus, we place them in the stack to avoid bound checks and greater data locality. + pub buffer1: MutableBuffer, + pub buffer2: MutableBuffer, + pub child_data: Vec>, +} + +impl<'a> _MutableArrayData<'a> { + pub(super) fn null_buffer(&mut self) -> &mut MutableBuffer { + self.null_buffer + .as_mut() + .expect("MutableArrayData not nullable") + } +} + +pub(super) fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits { + if let Some(nulls) = array.nulls() { + let bytes = nulls.validity(); + Box::new(move |mutable, start, len| { + let mutable_len = mutable.len; + let out = mutable.null_buffer(); + utils::resize_for_bits(out, mutable_len + len); + mutable.null_count += set_bits( + out.as_slice_mut(), + bytes, + mutable_len, + nulls.offset() + start, + len, + ); + }) + } else if use_nulls { + Box::new(|mutable, _, len| { + let mutable_len = mutable.len; + let out = mutable.null_buffer(); + utils::resize_for_bits(out, mutable_len + len); + let write_data = out.as_slice_mut(); + (0..len).for_each(|i| { + bit_util::set_bit(write_data, mutable_len + i); + }); + }) + } else { + Box::new(|_, _, _| {}) + } +} + #[cfg(test)] mod tests { use crate::transform::utils::extend_offsets;