Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redundant data copying in unnest #13441

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{cast, is_not_null, kernels, sum};
use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::{Int64Array, Scalar, StructArray};
use arrow_array::{new_null_array, Int64Array, Scalar, StructArray};
use arrow_ord::cmp::lt;
use datafusion_common::{
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
Expand Down Expand Up @@ -453,16 +453,36 @@ fn list_unnest_at_level(

// Create the take indices array for other columns
let take_indices = create_take_indicies(unnested_length, total_length);

// Dimension of arrays in batch is untouched, but the values are repeated
// as the side effect of unnesting
let ret = repeat_arrs_from_indices(batch, &take_indices)?;
unnested_temp_arrays
.into_iter()
.zip(list_unnest_specs.iter())
.for_each(|(flatten_arr, unnesting)| {
temp_unnested_arrs.insert(*unnesting, flatten_arr);
});

let repeat_mask: Vec<bool> = batch
.iter()
.enumerate()
.map(|(i, _)| {
// Check if the column is needed in future levels (levels below the current one)
let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
});

// Check if the column is involved in unnesting at any level
let is_involved_in_unnesting = list_type_unnests
.iter()
.any(|unnesting| unnesting.index_in_input_schema == i);

// Repeat columns needed in future levels or not unnested.
needed_in_future_levels || !is_involved_in_unnesting
})
.collect();

// Dimension of arrays in batch is untouched, but the values are repeated
// as the side effect of unnesting
let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;

Ok((ret, total_length))
}
struct UnnestingResult {
Expand Down Expand Up @@ -859,8 +879,11 @@ fn create_take_indicies(
builder.finish()
}

/// Create the batch given an arrays and a `indices` array
/// that is used by the take kernel to copy values.
/// Create a batch of arrays based on an input `batch` and a `indices` array.
/// The `indices` array is used by the take kernel to repeat values in the arrays
/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
/// appropriate length.
///
/// For example if we have the following batch:
///
Expand Down Expand Up @@ -890,14 +913,35 @@ fn create_take_indicies(
/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
/// ```
///
/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
/// For example, if the `repeat_mask` is:
///
/// ```ignore
/// [true, false]
/// ```
///
/// The final batch will look like:
///
/// ```ignore
/// c1: 1, null, 2, 3, 4, null, 5, 6 // Repeated using `indices`
/// c2: null, null, null, null, null, null, null, null // Replaced with nulls
///
fn repeat_arrs_from_indices(
batch: &[ArrayRef],
indices: &PrimitiveArray<Int64Type>,
repeat_mask: &[bool],
) -> Result<Vec<Arc<dyn Array>>> {
batch
.iter()
.map(|arr| Ok(kernels::take::take(arr, indices, None)?))
.collect::<Result<_>>()
.zip(repeat_mask.iter())
.map(|(arr, &repeat)| {
if repeat {
Ok(kernels::take::take(arr, indices, None)?)
} else {
Ok(new_null_array(arr.data_type(), arr.len()))
}
})
.collect()
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,9 @@ select unnest(u.column5), j.* except(column2, column3) from unnest_table u join
1 2 1
3 4 2
NULL NULL 4

## Issue: https://github.com/apache/datafusion/issues/13237
query I
select count(*) from (select unnest(range(0, 100000)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id;
----
100000