Skip to content

Commit

Permalink
Fix redundant data copying in unnest (#13441)
Browse files Browse the repository at this point in the history
* Fix redundant data copying in unnest

* Add test

* fix typo
  • Loading branch information
demetribu authored Nov 18, 2024
1 parent 6b0570b commit 189536b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 9 deletions.
62 changes: 53 additions & 9 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{cast, is_not_null, kernels, sum};
use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::{Int64Array, Scalar, StructArray};
use arrow_array::{new_null_array, Int64Array, Scalar, StructArray};
use arrow_ord::cmp::lt;
use datafusion_common::{
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
Expand Down Expand Up @@ -453,16 +453,36 @@ fn list_unnest_at_level(

// Create the take indices array for other columns
let take_indices = create_take_indicies(unnested_length, total_length);

// Dimension of arrays in batch is untouched, but the values are repeated
// as the side effect of unnesting
let ret = repeat_arrs_from_indices(batch, &take_indices)?;
unnested_temp_arrays
.into_iter()
.zip(list_unnest_specs.iter())
.for_each(|(flatten_arr, unnesting)| {
temp_unnested_arrs.insert(*unnesting, flatten_arr);
});

let repeat_mask: Vec<bool> = batch
.iter()
.enumerate()
.map(|(i, _)| {
// Check if the column is needed in future levels (levels below the current one)
let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
});

// Check if the column is involved in unnesting at any level
let is_involved_in_unnesting = list_type_unnests
.iter()
.any(|unnesting| unnesting.index_in_input_schema == i);

// Repeat columns needed in future levels or not unnested.
needed_in_future_levels || !is_involved_in_unnesting
})
.collect();

// Dimension of arrays in batch is untouched, but the values are repeated
// as the side effect of unnesting
let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;

Ok((ret, total_length))
}
struct UnnestingResult {
Expand Down Expand Up @@ -859,8 +879,11 @@ fn create_take_indicies(
builder.finish()
}

/// Create the batch given an arrays and a `indices` array
/// that is used by the take kernel to copy values.
/// Create a batch of arrays based on an input `batch` and a `indices` array.
/// The `indices` array is used by the take kernel to repeat values in the arrays
/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
/// appropriate length.
///
/// For example if we have the following batch:
///
Expand Down Expand Up @@ -890,14 +913,35 @@ fn create_take_indicies(
/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
/// ```
///
/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
/// For example, if the `repeat_mask` is:
///
/// ```ignore
/// [true, false]
/// ```
///
/// The final batch will look like:
///
/// ```ignore
/// c1: 1, null, 2, 3, 4, null, 5, 6 // Repeated using `indices`
/// c2: null, null, null, null, null, null, null, null // Replaced with nulls
///
fn repeat_arrs_from_indices(
batch: &[ArrayRef],
indices: &PrimitiveArray<Int64Type>,
repeat_mask: &[bool],
) -> Result<Vec<Arc<dyn Array>>> {
batch
.iter()
.map(|arr| Ok(kernels::take::take(arr, indices, None)?))
.collect::<Result<_>>()
.zip(repeat_mask.iter())
.map(|(arr, &repeat)| {
if repeat {
Ok(kernels::take::take(arr, indices, None)?)
} else {
Ok(new_null_array(arr.data_type(), arr.len()))
}
})
.collect()
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,9 @@ select unnest(u.column5), j.* except(column2, column3) from unnest_table u join
1 2 1
3 4 2
NULL NULL 4

## Issue: https://github.com/apache/datafusion/issues/13237
query I
select count(*) from (select unnest(range(0, 100000)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id;
----
100000

0 comments on commit 189536b

Please sign in to comment.