diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 0099da56e833..0a298532fdf6 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -144,6 +144,9 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { concat_fallback(arrays, capacity) } +/// Concatenates arrays using MutableArrayData +/// +/// This will naively concatenate dictionaries fn concat_fallback( arrays: &[&dyn Array], capacity: Capacities, diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs index a38e0ac82739..5171b1fd7f9c 100644 --- a/arrow-select/src/dictionary.rs +++ b/arrow-select/src/dictionary.rs @@ -21,7 +21,7 @@ use arrow_array::builder::BooleanBufferBuilder; use arrow_array::cast::AsArray; use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType}; use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray}; -use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer}; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType}; @@ -36,6 +36,10 @@ struct Interner<'a, V> { } impl<'a, V> Interner<'a, V> { + /// Capacity controls the number of unique buckets allocated within the Interner + /// + /// A larger capacity reduces the probability of hash collisions, and should be set + /// based on an approximation of the upper bound of unique values fn new(capacity: usize) -> Self { // Add additional buckets to help reduce collisions let shift = (capacity as u64 + 128).leading_zeros(); @@ -77,7 +81,7 @@ pub struct MergedDictionaries { } /// A weak heuristic of whether to merge dictionary values that aims to only -/// perform the expensive computation when is likely to yield at least +/// perform the expensive merge computation when it is likely to yield at least /// some return over the naive approach used by MutableArrayData /// /// `len` is the total length of the merged output @@ -105,10 +109,14 @@ pub fn should_merge_dictionary_values( !single_dictionary && is_supported && (overflow || values_exceed_length) } -/// Given an array of dictionaries and an optional row mask compute a values array +/// Given an array of dictionaries and an optional key mask compute a values array /// containing referenced values, along with mappings from the [`DictionaryArray`] /// keys to the new keys within this values array. Best-effort will be made to ensure /// that the dictionary values are unique +/// +/// This method is meant to be very fast and the output dictionary values +/// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower +/// but produces unique values pub fn merge_dictionary_values( dictionaries: &[(&DictionaryArray, Option)], ) -> Result, ArrowError> { @@ -118,10 +126,14 @@ pub fn merge_dictionary_values( let mut value_slices = Vec::with_capacity(dictionaries.len()); for (dictionary, key_mask) in dictionaries { - let values_mask = match key_mask { - Some(key_mask) => compute_values_mask(dictionary, key_mask.set_indices()), - None => compute_values_mask(dictionary, 0..dictionary.len()), + let key_mask = match (dictionary.logical_nulls(), key_mask) { + (Some(n), None) => Some(n.into_inner()), + (None, Some(n)) => Some(n.clone()), + (Some(n), Some(m)) => Some(n.inner() & m), + (None, None) => None, }; + let keys = dictionary.keys().values(); + let values_mask = compute_values_mask(keys, key_mask.as_ref()); let v = dictionary.values().as_ref(); num_values += v.len(); value_slices.push(get_masked_values(v, &values_mask)); @@ -165,25 +177,20 @@ pub fn merge_dictionary_values( /// Return a mask identifying the values that are referenced by keys in `dictionary` /// at the positions indicated by `selection` -fn compute_values_mask( - dictionary: &DictionaryArray, - selection: I, -) -> BooleanBuffer -where - K: ArrowDictionaryKeyType, - I: IntoIterator, -{ - let len = dictionary.values().len(); - let mut builder = - BooleanBufferBuilder::new_from_buffer(MutableBuffer::new_null(len), len); - - let keys = dictionary.keys(); - - for i in selection { - if keys.is_valid(i) { - let key = keys.values()[i]; - builder.set_bit(key.as_usize(), true) - } +fn compute_values_mask( + keys: &ScalarBuffer, + mask: Option<&BooleanBuffer>, +) -> BooleanBuffer { + let mut builder = BooleanBufferBuilder::new(keys.len()); + builder.advance(keys.len()); + + match mask { + Some(n) => n + .set_indices() + .for_each(|idx| builder.set_bit(keys[idx].as_usize(), true)), + None => keys + .iter() + .for_each(|k| builder.set_bit(k.as_usize(), true)), } builder.finish() } @@ -221,8 +228,9 @@ mod tests { use crate::dictionary::merge_dictionary_values; use arrow_array::cast::as_string_array; use arrow_array::types::Int32Type; - use arrow_array::DictionaryArray; - use arrow_buffer::BooleanBuffer; + use arrow_array::{DictionaryArray, Int32Array, StringArray}; + use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer}; + use std::sync::Arc; #[test] fn test_merge_strings() { @@ -263,4 +271,31 @@ mod tests { assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 0, 0]); assert_eq!(&merged.key_mappings[1], &[2, 3, 1, 4]); } + + #[test] + fn test_merge_nulls() { + let buffer = Buffer::from("helloworldbingohelloworld"); + let offsets = OffsetBuffer::from_lengths([5, 5, 5, 5, 5]); + let nulls = NullBuffer::from(vec![true, false, true, true, true]); + let values = StringArray::new(offsets, buffer, Some(nulls)); + + let key_values = vec![1, 2, 3, 1, 8, 2, 3]; + let key_nulls = + NullBuffer::from(vec![true, true, false, true, false, true, true]); + let keys = Int32Array::new(key_values.into(), Some(key_nulls)); + let a = DictionaryArray::new(keys, Arc::new(values)); + // [NULL, "bingo", NULL, NULL, NULL, "bingo", "hello"] + + let b = DictionaryArray::new( + Int32Array::new_null(10), + Arc::new(StringArray::new_null(0)), + ); + + let merged = merge_dictionary_values(&[(&a, None), (&b, None)]).unwrap(); + let expected = StringArray::from(vec!["bingo", "hello"]); + assert_eq!(merged.values.as_ref(), &expected); + assert_eq!(merged.key_mappings.len(), 2); + assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]); + assert_eq!(&merged.key_mappings[1], &[]); + } }