Skip to content

Commit

Permalink
Refactor array_union and array_intersect functions to one general…
Browse files Browse the repository at this point in the history
… function (#8516)

* Refactor array_union and array_intersect functions

* fix cli

* fix ci

* add tests for null

* modify the return type

* update tests

* fix clippy

* fix clippy

* add tests for largelist

* fix clippy

* Add field parameter to generic_set_lists() function

* Add large array drop statements

* fix clippy
  • Loading branch information
Weijun-H authored Dec 27, 2023
1 parent 26a8000 commit 58b0a2b
Show file tree
Hide file tree
Showing 3 changed files with 446 additions and 144 deletions.
13 changes: 12 additions & 1 deletion datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,18 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayToString => Ok(Utf8),
BuiltinScalarFunction::ArrayUnion | BuiltinScalarFunction::ArrayIntersect => {
BuiltinScalarFunction::ArrayIntersect => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, DataType::Null) | (DataType::Null, _) => {
Ok(DataType::Null)
}
(_, DataType::Null) => {
Ok(List(Arc::new(Field::new("item", Null, true))))
}
(dt, _) => Ok(dt),
}
}
BuiltinScalarFunction::ArrayUnion => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, dt) => Ok(dt),
(dt, DataType::Null) => Ok(dt),
Expand Down
283 changes: 146 additions & 137 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::any::type_name;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::sync::Arc;

use arrow::array::*;
Expand Down Expand Up @@ -1777,97 +1778,173 @@ macro_rules! to_string {
}};
}

fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
#[derive(Debug, PartialEq)]
enum SetOp {
Union,
Intersect,
}

impl Display for SetOp {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SetOp::Union => write!(f, "array_union"),
SetOp::Intersect => write!(f, "array_intersect"),
}
}
}

fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
field: Arc<Field>,
set_op: SetOp,
) -> Result<ArrayRef> {
if matches!(l.value_type(), DataType::Null) {
let field = Arc::new(Field::new("item", r.value_type(), true));
return general_array_distinct::<OffsetSize>(r, &field);
} else if matches!(r.value_type(), DataType::Null) {
let field = Arc::new(Field::new("item", l.value_type(), true));
return general_array_distinct::<OffsetSize>(l, &field);
}

let nulls = NullBuffer::union(l.nulls(), r.nulls());
let l_values = l.values().clone();
let r_values = r.values().clone();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;
if l.value_type() != r.value_type() {
return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'");
}

// Might be worth adding an upstream OffsetBufferBuilder
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));
let mut rows = Vec::with_capacity(l_values.num_rows() + r_values.num_rows());
let mut dedup = HashSet::new();
for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in l_slice {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
rows.push(left_row);
}
}
for i in r_slice {
let right_row = r_values.row(i);
if dedup.insert(right_row) {
rows.push(right_row);
let dt = l.value_type();

let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];

let converter = RowConverter::new(vec![SortField::new(dt)])?;
for (first_arr, second_arr) in l.iter().zip(r.iter()) {
if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
let l_values = converter.convert_columns(&[first_arr])?;
let r_values = converter.convert_columns(&[second_arr])?;

let l_iter = l_values.iter().sorted().dedup();
let values_set: HashSet<_> = l_iter.clone().collect();
let mut rows = if set_op == SetOp::Union {
l_iter.collect::<Vec<_>>()
} else {
vec![]
};
for r_val in r_values.iter().sorted().dedup() {
match set_op {
SetOp::Union => {
if !values_set.contains(&r_val) {
rows.push(r_val);
}
}
SetOp::Intersect => {
if values_set.contains(&r_val) {
rows.push(r_val);
}
}
}
}

let last_offset = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!("{set_op}: failed to get array from rows");
}
};
new_arrays.push(array);
}
offsets.push(OffsetSize::usize_as(rows.len()));
dedup.clear();
}

let values = converter.convert_rows(rows)?;
let offsets = OffsetBuffer::new(offsets.into());
let result = values[0].clone();
Ok(GenericListArray::<OffsetSize>::new(
field.clone(),
offsets,
result,
nulls,
))
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
let arr = GenericListArray::<OffsetSize>::try_new(field, offsets, values, None)?;
Ok(Arc::new(arr))
}

/// Array_union SQL function
pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_union needs 2 arguments");
}
let array1 = &args[0];
let array2 = &args[1];
fn general_set_op(
array1: &ArrayRef,
array2: &ArrayRef,
set_op: SetOp,
) -> Result<ArrayRef> {
match (array1.data_type(), array2.data_type()) {
(DataType::Null, DataType::List(field)) => {
if set_op == SetOp::Intersect {
return Ok(new_empty_array(&DataType::Null));
}
let array = as_list_array(&array2)?;
general_array_distinct::<i32>(array, field)
}

fn union_arrays<O: OffsetSizeTrait>(
array1: &ArrayRef,
array2: &ArrayRef,
l_field_ref: &Arc<Field>,
r_field_ref: &Arc<Field>,
) -> Result<ArrayRef> {
match (l_field_ref.data_type(), r_field_ref.data_type()) {
(DataType::Null, _) => Ok(array2.clone()),
(_, DataType::Null) => Ok(array1.clone()),
(_, _) => {
let list1 = array1.as_list::<O>();
let list2 = array2.as_list::<O>();
let result = union_generic_lists::<O>(list1, list2, l_field_ref)?;
Ok(Arc::new(result))
(DataType::List(field), DataType::Null) => {
if set_op == SetOp::Intersect {
return make_array(&[]);
}
let array = as_list_array(&array1)?;
general_array_distinct::<i32>(array, field)
}
}
(DataType::Null, DataType::LargeList(field)) => {
if set_op == SetOp::Intersect {
return Ok(new_empty_array(&DataType::Null));
}
let array = as_large_list_array(&array2)?;
general_array_distinct::<i64>(array, field)
}
(DataType::LargeList(field), DataType::Null) => {
if set_op == SetOp::Intersect {
return make_array(&[]);
}
let array = as_large_list_array(&array1)?;
general_array_distinct::<i64>(array, field)
}
(DataType::Null, DataType::Null) => Ok(new_empty_array(&DataType::Null)),

match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) => Ok(array2.clone()),
(_, DataType::Null) => Ok(array1.clone()),
(DataType::List(l_field_ref), DataType::List(r_field_ref)) => {
union_arrays::<i32>(array1, array2, l_field_ref, r_field_ref)
(DataType::List(field), DataType::List(_)) => {
let array1 = as_list_array(&array1)?;
let array2 = as_list_array(&array2)?;
generic_set_lists::<i32>(array1, array2, field.clone(), set_op)
}
(DataType::LargeList(l_field_ref), DataType::LargeList(r_field_ref)) => {
union_arrays::<i64>(array1, array2, l_field_ref, r_field_ref)
(DataType::LargeList(field), DataType::LargeList(_)) => {
let array1 = as_large_list_array(&array1)?;
let array2 = as_large_list_array(&array2)?;
generic_set_lists::<i64>(array1, array2, field.clone(), set_op)
}
_ => {
(data_type1, data_type2) => {
internal_err!(
"array_union only support list with offsets of type int32 and int64"
"{set_op} does not support types '{data_type1:?}' and '{data_type2:?}'"
)
}
}
}

/// Array_union SQL function
pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_union needs two arguments");
}
let array1 = &args[0];
let array2 = &args[1];

general_set_op(array1, array2, SetOp::Union)
}

/// array_intersect SQL function
pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_intersect needs two arguments");
}

let array1 = &args[0];
let array2 = &args[1];

general_set_op(array1, array2, SetOp::Intersect)
}

/// Array_to_string SQL function
pub fn array_to_string(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() < 2 || args.len() > 3 {
Expand Down Expand Up @@ -2228,7 +2305,7 @@ pub fn array_has(args: &[ArrayRef]) -> Result<ArrayRef> {
DataType::LargeList(_) => {
general_array_has_dispatch::<i64>(&args[0], &args[1], ComparisonType::Single)
}
_ => internal_err!("array_has does not support type '{array_type:?}'."),
_ => exec_err!("array_has does not support type '{array_type:?}'."),
}
}

Expand Down Expand Up @@ -2359,74 +2436,6 @@ pub fn string_to_array<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef
Ok(Arc::new(list_array) as ArrayRef)
}

/// array_intersect SQL function
pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_intersect needs two arguments");
}

let first_array = &args[0];
let second_array = &args[1];

match (first_array.data_type(), second_array.data_type()) {
(DataType::Null, _) => Ok(second_array.clone()),
(_, DataType::Null) => Ok(first_array.clone()),
_ => {
let first_array = as_list_array(&first_array)?;
let second_array = as_list_array(&second_array)?;

if first_array.value_type() != second_array.value_type() {
return internal_err!("array_intersect is not implemented for '{first_array:?}' and '{second_array:?}'");
}

let dt = first_array.value_type();

let mut offsets = vec![0];
let mut new_arrays = vec![];

let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
for (first_arr, second_arr) in first_array.iter().zip(second_array.iter()) {
if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
let l_values = converter.convert_columns(&[first_arr])?;
let r_values = converter.convert_columns(&[second_arr])?;

let values_set: HashSet<_> = l_values.iter().collect();
let mut rows = Vec::with_capacity(r_values.num_rows());
for r_val in r_values.iter().sorted().dedup() {
if values_set.contains(&r_val) {
rows.push(r_val);
}
}

let last_offset: i32 = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + rows.len() as i32);
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!(
"array_intersect: failed to get array from rows"
)
}
};
new_arrays.push(array);
}
}

let field = Arc::new(Field::new("item", dt, true));
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref =
new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?);
Ok(arr)
}
}
}

pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
array: &GenericListArray<OffsetSize>,
field: &FieldRef,
Expand Down
Loading

0 comments on commit 58b0a2b

Please sign in to comment.