Skip to content

Commit

Permalink
Specialize interleave dictionary (apache#2944)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 28, 2022
1 parent b6f08a8 commit 61544a5
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 4 deletions.
1 change: 1 addition & 0 deletions arrow-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ arrow-buffer = { version = "25.0.0", path = "../arrow-buffer" }
arrow-data = { version = "25.0.0", path = "../arrow-data" }
arrow-schema = { version = "25.0.0", path = "../arrow-schema" }
arrow-array = { version = "25.0.0", path = "../arrow-array" }
hashbrown = { version = "0.12", default-features = false, features = ["ahash", "inline-more"] }
num = { version = "0.4", default-features = false, features = ["std"] }

[features]
Expand Down
148 changes: 144 additions & 4 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
// under the License.

use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::{
downcast_primitive, make_array, new_empty_array, Array, ArrayRef, ArrowPrimitiveType,
GenericStringArray, OffsetSizeTrait, PrimitiveArray,
downcast_integer, downcast_primitive, make_array, new_empty_array, Array, ArrayRef,
ArrowPrimitiveType, DictionaryArray, GenericStringArray, OffsetSizeTrait,
PrimitiveArray,
};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::bit_util::get_bit;
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType};
use hashbrown::HashMap;
use std::sync::Arc;

macro_rules! primitive_helper {
Expand All @@ -32,6 +36,12 @@ macro_rules! primitive_helper {
};
}

macro_rules! dictionary_helper {
($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
interleave_dictionary::<$t>($values, $indices, $data_type)
};
}

///
/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
///
Expand Down Expand Up @@ -87,6 +97,10 @@ pub fn interleave(
data_type => (primitive_helper, values, indices, data_type),
DataType::Utf8 => interleave_string::<i32>(values, indices, data_type),
DataType::LargeUtf8 => interleave_string::<i64>(values, indices, data_type),
DataType::Dictionary(k, _) => downcast_integer! {
k.as_ref() => (dictionary_helper, values, indices, data_type),
_ => unreachable!(),
}
_ => interleave_fallback(values, indices)
}
}
Expand Down Expand Up @@ -189,6 +203,74 @@ fn interleave_string<O: OffsetSizeTrait>(
Ok(Arc::new(GenericStringArray::<O>::from(data)))
}

/// Interleaves dictionary arrays
///
///
/// This will only copy dictionary values used by the output selection. However, the
/// resulting dictionary may contain duplicates if the source dictionaries contain duplicates
/// or the same value appears in multiple source arrays
fn interleave_dictionary<K>(
values: &[&dyn Array],
indices: &[(usize, usize)],
data_type: &DataType,
) -> Result<ArrayRef, ArrowError>
where
K: ArrowDictionaryKeyType,
K::Native: std::hash::Hash + Eq,
{
let interleaved = Interleave::<'_, DictionaryArray<K>>::new(values, indices);
let mut value_indices = Vec::with_capacity(indices.len());
let mut keys = MutableBuffer::new(indices.len());

// Map from (array,key) to output key
let mut mapping: HashMap<(usize, K::Native), K::Native> =
HashMap::with_capacity(indices.len());

// Given an array index and key, updates mapping and value_indices, returning the new key
let mut intern_key = |a: usize, k: K::Native| -> K::Native {
*mapping.entry((a, k)).or_insert_with(|| {
let new_key = K::Native::from_usize(value_indices.len()).expect("overflow");
value_indices.push((a, k.as_usize()));
new_key
})
};

// Iterate through identifying selected dictionary keys
match &interleaved.nulls {
Some(nulls) => {
for (idx, (a, b)) in indices.iter().enumerate() {
keys.push(match get_bit(nulls.as_ref(), idx) {
true => intern_key(*a, interleaved.arrays[*a].keys().value(*b)),
false => K::Native::default(),
})
}
}
None => {
for (a, b) in indices {
keys.push(intern_key(*a, interleaved.arrays[*a].keys().value(*b)))
}
}
}

// Copy across only values that were selected
let values: Vec<_> = interleaved
.arrays
.iter()
.map(|x| x.values().as_ref())
.collect();
let child_data = interleave(&values, &value_indices)?.data().clone();

let builder = ArrayDataBuilder::new(data_type.clone())
.len(indices.len())
.add_buffer(keys.into())
.add_child_data(child_data)
.null_bit_buffer(interleaved.nulls)
.null_count(interleaved.null_count);

let data = unsafe { builder.build_unchecked() };
Ok(Arc::new(DictionaryArray::<K>::from(data)))
}

/// Fallback implementation of interleave using [`MutableArrayData`]
fn interleave_fallback(
values: &[&dyn Array],
Expand Down Expand Up @@ -226,7 +308,7 @@ fn interleave_fallback(
mod tests {
use super::*;
use arrow_array::builder::{Int32Builder, ListBuilder};
use arrow_array::cast::{as_primitive_array, as_string_array};
use arrow_array::cast::{as_dictionary_array, as_primitive_array, as_string_array};
use arrow_array::types::Int32Type;
use arrow_array::{Int32Array, ListArray, StringArray};
use arrow_schema::DataType;
Expand Down Expand Up @@ -282,6 +364,64 @@ mod tests {
)
}

#[test]
fn test_string_dictionaries() {
let a = DictionaryArray::<Int32Type>::from_iter([
Some("a"),
Some("b"),
None,
Some("b"),
Some("b"),
Some("a"),
]);

let b = DictionaryArray::<Int32Type>::from_iter([
Some("a"),
Some("c"),
None,
Some("c"),
Some("c"),
Some("d"),
]);

let interleaved =
interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1), (0, 2)])
.unwrap();

let result = as_dictionary_array::<Int32Type>(interleaved.as_ref())
.downcast_dict::<StringArray>()
.unwrap();

let r: Vec<_> = result.into_iter().collect();
assert_eq!(r, vec![None, None, Some("a"), Some("c"), Some("b"), None]);
}

#[test]
fn test_dictionary_nulls() {
let child = Int32Array::from(vec![0]).into_data();
let dictionary = ArrayDataBuilder::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Int32),
))
.len(2)
.add_buffer(Buffer::from_iter([-1_i32, 0_i32]))
.null_bit_buffer(Some(Buffer::from_slice_ref(&[0b00000010])))
.null_count(1)
.add_child_data(child)
.build()
.unwrap();

let dictionary = DictionaryArray::<Int32Type>::from(dictionary);
let interleaved = interleave(&[&dictionary], &[(0, 0), (0, 1)]).unwrap();

let result = as_dictionary_array::<Int32Type>(interleaved.as_ref())
.downcast_dict::<Int32Array>()
.unwrap();

let r: Vec<_> = result.into_iter().collect();
assert_eq!(r, vec![None, Some(0)]);
}

#[test]
fn test_lists() {
// [[1, 2], null, [3]]
Expand Down
6 changes: 6 additions & 0 deletions arrow/benches/interleave_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ fn add_benchmark(c: &mut Criterion) {
let string = create_string_array_with_len::<i32>(1024, 0., 20);
let string_opt = create_string_array_with_len::<i32>(1024, 0.5, 20);

let values = create_string_array_with_len::<i32>(10, 0., 20);
let dict = create_dict_from_values::<Int32Type>(1024, 0., &values);
let dict_opt = create_dict_from_values::<Int32Type>(1024, 0.5, &values);

let cases: &[(&str, &dyn Array)] = &[
("i32(0.0)", &i32),
("i32(0.5)", &i32_opt),
("str(20, 0.0)", &string),
("str(20, 0.5)", &string_opt),
("dict(10, 0.0, str(20, 0.0))", &dict),
("dict(10, 0.5, str(20, 0.0)", &dict_opt),
];

for (prefix, base) in cases {
Expand Down

0 comments on commit 61544a5

Please sign in to comment.