Skip to content

Commit

Permalink
Improved null handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Sep 1, 2023
1 parent 6a5c0cf commit 93fb4da
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 27 deletions.
3 changes: 3 additions & 0 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
concat_fallback(arrays, capacity)
}

/// Concatenates arrays using MutableArrayData
///
/// This will naively concatenate dictionaries
fn concat_fallback(
arrays: &[&dyn Array],
capacity: Capacities,
Expand Down
89 changes: 62 additions & 27 deletions arrow-select/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType};
use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray};
use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer};
use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType};

Expand All @@ -36,6 +36,10 @@ struct Interner<'a, V> {
}

impl<'a, V> Interner<'a, V> {
/// Capacity controls the number of unique buckets allocated within the Interner
///
/// A larger capacity reduces the probability of hash collisions, and should be set
/// based on an approximation of the upper bound of unique values
fn new(capacity: usize) -> Self {
// Add additional buckets to help reduce collisions
let shift = (capacity as u64 + 128).leading_zeros();
Expand Down Expand Up @@ -77,7 +81,7 @@ pub struct MergedDictionaries<K: ArrowDictionaryKeyType> {
}

/// A weak heuristic of whether to merge dictionary values that aims to only
/// perform the expensive computation when is likely to yield at least
/// perform the expensive merge computation when it is likely to yield at least
/// some return over the naive approach used by MutableArrayData
///
/// `len` is the total length of the merged output
Expand Down Expand Up @@ -105,10 +109,14 @@ pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
!single_dictionary && is_supported && (overflow || values_exceed_length)
}

/// Given an array of dictionaries and an optional row mask compute a values array
/// Given an array of dictionaries and an optional key mask compute a values array
/// containing referenced values, along with mappings from the [`DictionaryArray`]
/// keys to the new keys within this values array. Best-effort will be made to ensure
/// that the dictionary values are unique
///
/// This method is meant to be very fast and the output dictionary values
/// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower
/// but produces unique values
pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
dictionaries: &[(&DictionaryArray<K>, Option<BooleanBuffer>)],
) -> Result<MergedDictionaries<K>, ArrowError> {
Expand All @@ -118,10 +126,14 @@ pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
let mut value_slices = Vec::with_capacity(dictionaries.len());

for (dictionary, key_mask) in dictionaries {
let values_mask = match key_mask {
Some(key_mask) => compute_values_mask(dictionary, key_mask.set_indices()),
None => compute_values_mask(dictionary, 0..dictionary.len()),
let key_mask = match (dictionary.logical_nulls(), key_mask) {
(Some(n), None) => Some(n.into_inner()),
(None, Some(n)) => Some(n.clone()),
(Some(n), Some(m)) => Some(n.inner() & m),
(None, None) => None,
};
let keys = dictionary.keys().values();
let values_mask = compute_values_mask(keys, key_mask.as_ref());
let v = dictionary.values().as_ref();
num_values += v.len();
value_slices.push(get_masked_values(v, &values_mask));
Expand Down Expand Up @@ -165,25 +177,20 @@ pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(

/// Return a mask identifying the values that are referenced by keys in `dictionary`
/// at the positions indicated by `selection`
fn compute_values_mask<K, I>(
dictionary: &DictionaryArray<K>,
selection: I,
) -> BooleanBuffer
where
K: ArrowDictionaryKeyType,
I: IntoIterator<Item = usize>,
{
let len = dictionary.values().len();
let mut builder =
BooleanBufferBuilder::new_from_buffer(MutableBuffer::new_null(len), len);

let keys = dictionary.keys();

for i in selection {
if keys.is_valid(i) {
let key = keys.values()[i];
builder.set_bit(key.as_usize(), true)
}
fn compute_values_mask<K: ArrowNativeType>(
keys: &ScalarBuffer<K>,
mask: Option<&BooleanBuffer>,
) -> BooleanBuffer {
let mut builder = BooleanBufferBuilder::new(keys.len());
builder.advance(keys.len());

match mask {
Some(n) => n
.set_indices()
.for_each(|idx| builder.set_bit(keys[idx].as_usize(), true)),
None => keys
.iter()
.for_each(|k| builder.set_bit(k.as_usize(), true)),
}
builder.finish()
}
Expand Down Expand Up @@ -221,8 +228,9 @@ mod tests {
use crate::dictionary::merge_dictionary_values;
use arrow_array::cast::as_string_array;
use arrow_array::types::Int32Type;
use arrow_array::DictionaryArray;
use arrow_buffer::BooleanBuffer;
use arrow_array::{DictionaryArray, Int32Array, StringArray};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
use std::sync::Arc;

#[test]
fn test_merge_strings() {
Expand Down Expand Up @@ -263,4 +271,31 @@ mod tests {
assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 0, 0]);
assert_eq!(&merged.key_mappings[1], &[2, 3, 1, 4]);
}

#[test]
fn test_merge_nulls() {
let buffer = Buffer::from("helloworldbingohelloworld");
let offsets = OffsetBuffer::from_lengths([5, 5, 5, 5, 5]);
let nulls = NullBuffer::from(vec![true, false, true, true, true]);
let values = StringArray::new(offsets, buffer, Some(nulls));

let key_values = vec![1, 2, 3, 1, 8, 2, 3];
let key_nulls =
NullBuffer::from(vec![true, true, false, true, false, true, true]);
let keys = Int32Array::new(key_values.into(), Some(key_nulls));
let a = DictionaryArray::new(keys, Arc::new(values));
// [NULL, "bingo", NULL, NULL, NULL, "bingo", "hello"]

let b = DictionaryArray::new(
Int32Array::new_null(10),
Arc::new(StringArray::new_null(0)),
);

let merged = merge_dictionary_values(&[(&a, None), (&b, None)]).unwrap();
let expected = StringArray::from(vec!["bingo", "hello"]);
assert_eq!(merged.values.as_ref(), &expected);
assert_eq!(merged.key_mappings.len(), 2);
assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]);
assert_eq!(&merged.key_mappings[1], &[]);
}
}

0 comments on commit 93fb4da

Please sign in to comment.