Skip to content

Commit

Permalink
hash join: add build-side join keys to memory accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jan 21, 2025
1 parent 2f28327 commit 290e54b
Showing 1 changed file with 71 additions and 17 deletions.
88 changes: 71 additions & 17 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ use arrow::array::{
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
use arrow::compute::{and, concat_batches, take, FilterBuilder};
use arrow::compute::{and, concat, concat_batches, take, FilterBuilder};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_array::new_empty_array;
use arrow_schema::ArrowError;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
Expand Down Expand Up @@ -896,16 +897,18 @@ async fn collect_left_input(
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, mut reservation) = stream
let initial = (Vec::new(), 0, 0, metrics, reservation);
let (batches, num_rows, batches_size, metrics, mut reservation) = stream
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
acc.4.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
acc.2.build_input_rows.add(batch.num_rows());
acc.3.build_mem_used.add(batch_size);
acc.3.build_input_batches.add(1);
acc.3.build_input_rows.add(batch.num_rows());
// Update total batches size
acc.2 += batch_size;
// Update row count
acc.1 += batch.num_rows();
// Push batch to output
Expand All @@ -914,6 +917,56 @@ async fn collect_left_input(
})
.await?;

let batches_iter = batches.iter().rev();

// Collecting build-side join keys values
let left_values = if batches.is_empty() {
on_left
.iter()
.map(|expr| Ok(new_empty_array(&expr.data_type(&schema)?)))
.collect::<Result<Vec<_>>>()?
} else {
on_left
.iter()
.map(|expr| {
// Evaluate build-side join key expression on each batch, gradually
// reserving additional memory
let mut join_key_size = 0;
let join_key_arrays = batches_iter
.clone()
.map(|batch| {
let array: Arc<dyn Array> =
expr.evaluate(batch).unwrap().into_array(batch.num_rows())?;
let array_size = array.get_array_memory_size();
reservation.try_grow(array_size)?;
join_key_size += array_size;
Ok(array)
})
.collect::<Result<Vec<_>>>()?;

// Merge all keys into a single array, reserving memory for the copy,
// of source arrays (assuming worst case scenario)
reservation.try_grow(join_key_size)?;
let concatenated = concat(
join_key_arrays
.iter()
.map(AsRef::as_ref)
.collect::<Vec<_>>()
.as_ref(),
)?;

// Drop unconcatenated data and adjust memory reservation for single array
drop(join_key_arrays);
let build_side_mem = reservation.size() - join_key_size * 2
+ concatenated.get_array_memory_size();
reservation.resize(build_side_mem);
metrics.build_mem_used.set(build_side_mem);

Ok(concatenated)
})
.collect::<Result<Vec<_>>>()?
};

// Estimation of memory size, required for hashtable, prior to allocation.
// Final result can be verified using `RawTable.allocation_info()`
let fixed_size = size_of::<JoinHashMap>();
Expand All @@ -928,7 +981,6 @@ async fn collect_left_input(
let mut offset = 0;

// Updating hashmap starting from the last batch
let batches_iter = batches.iter().rev();
for batch in batches_iter.clone() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
Expand All @@ -944,9 +996,19 @@ async fn collect_left_input(
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we can directly index into the arrays

// Merge all batches into a single batch, so we can directly index into the arrays,
// with prior reserving memory for the copy of source batches (assuming worst case scenario)
reservation.try_grow(batches_size)?;
let single_batch = concat_batches(&schema, batches_iter)?;

// Drop original batches and adjust memory reservation for a single batch
drop(batches);
let build_side_mem = reservation.size() - batches_size * 2
+ get_record_batch_memory_size(&single_batch);
reservation.resize(build_side_mem);
metrics.build_mem_used.set(build_side_mem);

// Reserve additional memory for visited indices bitmap and create shared builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
Expand All @@ -960,14 +1022,6 @@ async fn collect_left_input(
BooleanBufferBuilder::new(0)
};

let left_values = on_left
.iter()
.map(|c| {
c.evaluate(&single_batch)?
.into_array(single_batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

let data = JoinLeftData::new(
hashmap,
single_batch,
Expand Down

0 comments on commit 290e54b

Please sign in to comment.