From 32c62dd6f7e0d0fe5ecc38a0cef0649562b36c6f Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 20 Dec 2023 22:04:04 +0300 Subject: [PATCH 1/4] maintaining fifo hashmap in hash join --- .../physical-plan/src/joins/hash_join.rs | 188 +++++++++--------- .../src/joins/symmetric_hash_join.rs | 2 + datafusion/physical-plan/src/joins/utils.rs | 78 +++++++- 3 files changed, 171 insertions(+), 97 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 13ac06ee301c..a82a356391c4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -29,7 +29,6 @@ use crate::joins::utils::{ need_produce_result_in_final, JoinHashMap, JoinHashMapType, }; use crate::{ - coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, expressions::Column, expressions::PhysicalSortExpr, @@ -52,10 +51,10 @@ use super::{ use arrow::array::{ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, - UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, + UInt64Array, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; -use arrow::compute::{and, take, FilterBuilder}; +use arrow::compute::{and, concat_batches, take, FilterBuilder}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -715,7 +714,10 @@ async fn collect_left_input( let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; - for batch in batches.iter() { + + // Reverse iteration over build-side input batches allows to create FIFO hashmap + let batches_iter = batches.iter().rev(); + for batch in batches_iter.clone() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); update_hash( @@ -726,19 +728,25 @@ async fn collect_left_input( &random_state, &mut hashes_buffer, 0, + true, )?; offset += batch.num_rows(); } // Merge all batches into a single batch, so we // can directly index into the arrays - let single_batch = concat_batches(&schema, &batches, num_rows)?; + let single_batch = concat_batches(&schema, batches_iter)?; let data = JoinLeftData::new(hashmap, single_batch, reservation); Ok(data) } -/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, -/// assuming that the [RecordBatch] corresponds to the `index`th +/// Updates `hash_map` with new entries from `batch` evaluated against the expressions `on` +/// using `offset` as a start value for `batch` row indices. +/// +/// `fifo_hashmap` sets the order of iteration over `batch` rows while updating hashmap, +/// which allows to keep either first (if set to true) or last (if set to false) row index +/// as a chain head for rows with equal hash values. +#[allow(clippy::too_many_arguments)] pub fn update_hash( on: &[Column], batch: &RecordBatch, @@ -747,6 +755,7 @@ pub fn update_hash( random_state: &RandomState, hashes_buffer: &mut Vec, deleted_offset: usize, + fifo_hashmap: bool, ) -> Result<()> where T: JoinHashMapType, @@ -763,28 +772,18 @@ where // For usual JoinHashmap, the implementation is void. hash_map.extend_zero(batch.num_rows()); - // insert hashes to key of the hashmap - let (mut_map, mut_list) = hash_map.get_mut(); - for (row, hash_value) in hash_values.iter().enumerate() { - let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { - // Already exists: add index to next array - let prev_index = *index; - // Store new value inside hashmap - *index = (row + offset + 1) as u64; - // Update chained Vec at row + offset with previous value - mut_list[row + offset - deleted_offset] = prev_index; - } else { - mut_map.insert( - *hash_value, - // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + offset + 1) as u64), - |(hash, _)| *hash, - ); - // chained list at (row + offset) is already initialized with 0 - // meaning end of list - } + // Updating JoinHashMap from hash values iterator + let hash_values_iter = hash_values + .iter() + .enumerate() + .map(|(i, val)| (i + offset, val)); + + if fifo_hashmap { + hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset); + } else { + hash_map.update_from_iter(hash_values_iter, deleted_offset); } + Ok(()) } @@ -987,6 +986,7 @@ pub fn build_equal_condition_join_indices( filter: Option<&JoinFilter>, build_side: JoinSide, deleted_offset: Option, + fifo_hashmap: bool, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -1002,10 +1002,9 @@ pub fn build_equal_condition_join_indices( hashes_buffer.clear(); hashes_buffer.resize(probe_batch.num_rows(), 0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // Using a buffer builder to avoid slower normal builder - let mut build_indices = UInt64BufferBuilder::new(0); - let mut probe_indices = UInt32BufferBuilder::new(0); - // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: + + // In case build-side input has not been inverted while JoinHashMap creation, the chained list algorithm + // will return build indices for each probe row in a reverse order as such: // Build Indices: [5, 4, 3] // Probe Indices: [1, 1, 1] // @@ -1034,44 +1033,17 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. - let hash_map = build_hashmap.get_map(); - let next_chain = build_hashmap.get_list(); - for (row, hash_value) in hash_values.iter().enumerate().rev() { - // Get the hash and find it in the build index - - // For every item on the build and probe we check if it matches - // This possibly contains rows with hash collisions, - // So we have to check here whether rows are equal or not - if let Some((_, index)) = - hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; - loop { - let build_row_value = if let Some(offset) = deleted_offset { - // This arguments means that we prune the next index way before here. - if i < offset as u64 { - // End of the list due to pruning - break; - } - i - offset as u64 - } else { - i - }; - build_indices.append(build_row_value); - probe_indices.append(row as u32); - // Follow the chain to get the next index value - let next = next_chain[build_row_value as usize]; - if next == 0 { - // end of list - break; - } - i = next - 1; - } - } - } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); + let (mut probe_indices, mut build_indices) = if fifo_hashmap { + build_hashmap.get_matched_indices(hash_values.iter().enumerate(), deleted_offset) + } else { + let (mut matched_probe, mut matched_build) = build_hashmap + .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); + + matched_probe.as_slice_mut().reverse(); + matched_build.as_slice_mut().reverse(); + + (matched_probe, matched_build) + }; let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); @@ -1279,6 +1251,7 @@ impl HashJoinStream { self.filter.as_ref(), JoinSide::Left, None, + true, ); let result = match left_right_indices { @@ -1393,7 +1366,9 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue}; + use datafusion_common::{ + assert_batches_eq, assert_batches_sorted_eq, assert_contains, ScalarValue, + }; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::Operator; @@ -1558,7 +1533,9 @@ mod tests { "| 3 | 5 | 9 | 20 | 5 | 80 |", "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1611,9 +1588,9 @@ mod tests { async fn join_inner_one_no_shared_column_names() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let left = build_table( - ("a1", &vec![1, 2, 3]), + ("a1", &vec![1, 3, 2]), ("b1", &vec![4, 5, 5]), // this has a repetition - ("c1", &vec![7, 8, 9]), + ("c1", &vec![7, 9, 8]), ); let right = build_table( ("a2", &vec![10, 20, 30]), @@ -1635,12 +1612,13 @@ mod tests { "| a1 | b1 | c1 | a2 | b2 | c2 |", "+----+----+----+----+----+----+", "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", "| 3 | 5 | 9 | 20 | 5 | 80 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1686,7 +1664,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1740,7 +1719,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1789,7 +1769,9 @@ mod tests { "| 1 | 4 | 7 | 10 | 4 | 70 |", "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); // second part let stream = join.execute(1, task_ctx.clone())?; @@ -1804,7 +1786,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2228,12 +2211,14 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", - "| 10 | 10 | 100 |", - "| 12 | 10 | 40 |", "| 8 | 8 | 20 |", + "| 12 | 10 | 40 |", + "| 10 | 10 | 100 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightSemi join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2288,12 +2273,14 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", - "| 10 | 10 | 100 |", - "| 12 | 10 | 40 |", "| 8 | 8 | 20 |", + "| 12 | 10 | 40 |", + "| 10 | 10 | 100 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightSemi join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9 let filter_expression = Arc::new(BinaryExpr::new( @@ -2314,11 +2301,13 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", - "| 10 | 10 | 100 |", "| 12 | 10 | 40 |", + "| 10 | 10 | 100 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightSemi join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2471,12 +2460,14 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", + "| 6 | 6 | 60 |", "| 2 | 2 | 80 |", "| 4 | 4 | 120 |", - "| 6 | 6 | 60 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightAnti join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2529,14 +2520,16 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", - "| 10 | 10 | 100 |", "| 12 | 10 | 40 |", + "| 6 | 6 | 60 |", "| 2 | 2 | 80 |", + "| 10 | 10 | 100 |", "| 4 | 4 | 120 |", - "| 6 | 6 | 60 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightAnti join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8 let column_indices = vec![ColumnIndex { @@ -2565,13 +2558,15 @@ mod tests { "+----+----+-----+", "| a2 | b2 | c2 |", "+----+----+-----+", + "| 8 | 8 | 20 |", + "| 6 | 6 | 60 |", "| 2 | 2 | 80 |", "| 4 | 4 | 120 |", - "| 6 | 6 | 60 |", - "| 8 | 8 | 20 |", "+----+----+-----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // RightAnti join output is expected to preserve right input order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2734,6 +2729,7 @@ mod tests { None, JoinSide::Left, None, + false, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b9101b57c3e5..d7473c0a3796 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -770,6 +770,7 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, Some(build_hash_joiner.deleted_offset), + false, )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( @@ -882,6 +883,7 @@ impl OneSideHashJoiner { random_state, &mut self.hashes_buffer, self.deleted_offset, + false, )?; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index eae65ce9c26b..fb20e3377730 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -31,7 +31,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics}; use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, - UInt32Builder, UInt64Array, + UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder, }; use arrow::compute; use arrow::datatypes::{Field, Schema, SchemaBuilder}; @@ -151,6 +151,82 @@ pub trait JoinHashMapType { fn get_map(&self) -> &RawTable<(u64, u64)>; /// Returns a reference to the next. fn get_list(&self) -> &Self::NextType; + + /// Updates hashmap from iterator of row indices & row hashes pairs. + fn update_from_iter<'a>( + &mut self, + iter: impl Iterator, + deleted_offset: usize, + ) { + let (mut_map, mut_list) = self.get_mut(); + for (row, hash_value) in iter { + let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + if let Some((_, index)) = item { + // Already exists: add index to next array + let prev_index = *index; + // Store new value inside hashmap + *index = (row + 1) as u64; + // Update chained Vec at row + offset with previous value + mut_list[row - deleted_offset] = prev_index; + } else { + mut_map.insert( + *hash_value, + // store the value + 1 as 0 value reserved for end of list + (*hash_value, (row + 1) as u64), + |(hash, _)| *hash, + ); + // chained list at (row + offset) is already initialized with 0 + // meaning end of list + } + } + } + + /// Returns all pairs of row indices matched by hash. + /// + /// This method only compares hashes, so additional further check for actual values + /// equality may be required. + fn get_matched_indices<'a>( + &self, + iter: impl Iterator, + deleted_offset: Option, + ) -> (UInt32BufferBuilder, UInt64BufferBuilder) { + let mut input_indices = UInt32BufferBuilder::new(0); + let mut match_indices = UInt64BufferBuilder::new(0); + + let hash_map = self.get_map(); + let next_chain = self.get_list(); + for (row_idx, hash_value) in iter { + // Get the hash and find it in the index + if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) + { + let mut i = *index - 1; + loop { + let match_row_idx = if let Some(offset) = deleted_offset { + // This arguments means that we prune the next index way before here. + if i < offset as u64 { + // End of the list due to pruning + break; + } + i - offset as u64 + } else { + i + }; + match_indices.append(match_row_idx); + input_indices.append(row_idx as u32); + // Follow the chain to get the next index value + let next = next_chain[match_row_idx as usize]; + if next == 0 { + // end of list + break; + } + i = next - 1; + } + } + } + + (input_indices, match_indices) + } } /// Implementation of `JoinHashMapType` for `JoinHashMap`. From c424eb365ab966f2767a8386a23398d3f0a2537f Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 28 Dec 2023 21:03:20 +0200 Subject: [PATCH 2/4] extended HashJoinExec docstring on build phase --- .../physical-plan/src/joins/hash_join.rs | 46 +++++++++++++++++-- datafusion/physical-plan/src/joins/utils.rs | 4 +- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a82a356391c4..f560f5b0c951 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -155,8 +155,48 @@ impl JoinLeftData { /// /// Execution proceeds in 2 stages: /// -/// 1. the **build phase** where a hash table is created from the tuples of the -/// build side. +/// 1. the **build phase** creates a hash table from the tuples of the build side, +/// and single concatenated batch containing data from all fetched record batches. +/// Resulting hash table stores hashed join-key fields for each row as a key, and +/// indices of corresponding rows in concatenated batch. +/// +/// Hash join uses LIFO data structure as a hash table, and in order to retain +/// original build-side input order while obtaining data during probe phase, hash +/// table is updated by iterating batch sequence in reverse order -- it allows to +/// keep rows with smaller indices "on the top" of hash table, and still maintain +/// correct indexing for concatenated build-side data batch. +/// +/// Example of build phase for 3 record batches: +/// +/// +/// ```text +/// +/// Original build-side data Inserting build-side values into hashmap Concatenated build-side batch +/// ┌───────────────────────────┐ +/// hasmap.insert(row-hash, row-idx + offset) │ idx │ +/// ┌───────┐ │ ┌───────┐ │ +/// │ Row 1 │ 1) update_hash for batch 3 with offset 0 │ │ Row 6 │ 0 │ +/// Batch 1 │ │ - hashmap.insert(Row 7, idx 1) │ Batch 3 │ │ │ +/// │ Row 2 │ - hashmap.insert(Row 6, idx 0) │ │ Row 7 │ 1 │ +/// └───────┘ │ └───────┘ │ +/// │ │ +/// ┌───────┐ │ ┌───────┐ │ +/// │ Row 3 │ 2) update_hash for batch 2 with offset 2 │ │ Row 3 │ 2 │ +/// │ │ - hashmap.insert(Row 5, idx 4) │ │ │ │ +/// Batch 2 │ Row 4 │ - hashmap.insert(Row 4, idx 3) │ Batch 2 │ Row 4 │ 3 │ +/// │ │ - hashmap.insert(Row 3, idx 2) │ │ │ │ +/// │ Row 5 │ │ │ Row 5 │ 4 │ +/// └───────┘ │ └───────┘ │ +/// │ │ +/// ┌───────┐ │ ┌───────┐ │ +/// │ Row 6 │ 3) update_hash for batch 1 with offset 5 │ │ Row 1 │ 5 │ +/// Batch 3 │ │ - hashmap.insert(Row 2, idx 5) │ Batch 1 │ │ │ +/// │ Row 7 │ - hashmap.insert(Row 1, idx 6) │ │ Row 2 │ 6 │ +/// └───────┘ │ └───────┘ │ +/// │ │ +/// └───────────────────────────┘ +/// +/// ``` /// /// 2. the **probe phase** where the tuples of the probe side are streamed /// through, checking for matches of the join keys in the hash table. @@ -715,7 +755,7 @@ async fn collect_left_input( let mut hashes_buffer = Vec::new(); let mut offset = 0; - // Reverse iteration over build-side input batches allows to create FIFO hashmap + // Updating hashmap starting from the last batch let batches_iter = batches.iter().rev(); for batch in batches_iter.clone() { hashes_buffer.clear(); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index fb20e3377730..ba0e17026a34 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -166,7 +166,7 @@ pub trait JoinHashMapType { let prev_index = *index; // Store new value inside hashmap *index = (row + 1) as u64; - // Update chained Vec at row + offset with previous value + // Update chained Vec at `row` with previous value mut_list[row - deleted_offset] = prev_index; } else { mut_map.insert( @@ -175,7 +175,7 @@ pub trait JoinHashMapType { (*hash_value, (row + 1) as u64), |(hash, _)| *hash, ); - // chained list at (row + offset) is already initialized with 0 + // chained list at `row` is already initialized with 0 // meaning end of list } } From 7311bd9b83c250a97b6e1fd5f9bc33c4fc4fdda3 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Fri, 29 Dec 2023 12:08:41 +0200 Subject: [PATCH 3/4] testcases for randomly ordered build side input --- .../physical-plan/src/joins/hash_join.rs | 93 ++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index f560f5b0c951..de585ba2f98a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1628,9 +1628,9 @@ mod tests { async fn join_inner_one_no_shared_column_names() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let left = build_table( - ("a1", &vec![1, 3, 2]), + ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition - ("c1", &vec![7, 9, 8]), + ("c1", &vec![7, 8, 9]), ); let right = build_table( ("a2", &vec![10, 20, 30]), @@ -1652,8 +1652,48 @@ mod tests { "| a1 | b1 | c1 | a2 | b2 | c2 |", "+----+----+----+----+----+----+", "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 3 | 5 | 9 | 20 | 5 | 80 |", + "+----+----+----+----+----+----+", + ]; + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn join_inner_one_randomly_ordered() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![0, 3, 2, 1]), + ("b1", &vec![4, 5, 5, 4]), + ("c1", &vec![6, 9, 8, 7]), + ); + let right = build_table( + ("a2", &vec![20, 30, 10]), + ("b2", &vec![5, 6, 4]), + ("c2", &vec![80, 90, 70]), + ); + let on = vec![( + Column::new_with_schema("b1", &left.schema())?, + Column::new_with_schema("b2", &right.schema())?, + )]; + + let (columns, batches) = + join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + let expected = [ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", "| 3 | 5 | 9 | 20 | 5 | 80 |", "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 0 | 4 | 6 | 10 | 4 | 70 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", "+----+----+----+----+----+----+", ]; @@ -1765,6 +1805,55 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let batch1 = build_table_i32( + ("a1", &vec![0, 3]), + ("b1", &vec![4, 5]), + ("c1", &vec![6, 9]), + ); + let batch2 = build_table_i32( + ("a1", &vec![2, 1]), + ("b1", &vec![5, 4]), + ("c1", &vec![8, 7]), + ); + let schema = batch1.schema(); + let left = Arc::new( + MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(), + ); + let right = build_table( + ("a2", &vec![20, 30, 10]), + ("b2", &vec![5, 6, 4]), + ("c2", &vec![80, 90, 70]), + ); + let on = vec![( + Column::new_with_schema("b1", &left.schema())?, + Column::new_with_schema("b2", &right.schema())?, + )]; + + let (columns, batches) = + join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + let expected = [ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 3 | 5 | 9 | 20 | 5 | 80 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 0 | 4 | 6 | 10 | 4 | 70 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "+----+----+----+----+----+----+", + ]; + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); + + Ok(()) + } + /// Test where the left has 1 part, the right has 2 parts => 2 parts #[tokio::test] async fn join_inner_one_two_parts_right() -> Result<()> { From 00b899755c7b76b154951f43ae04380fe455121f Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Fri, 29 Dec 2023 13:22:02 +0200 Subject: [PATCH 4/4] trigger ci --- datafusion/physical-plan/src/joins/hash_join.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index de585ba2f98a..374a0ad50700 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1819,6 +1819,7 @@ mod tests { ("c1", &vec![8, 7]), ); let schema = batch1.schema(); + let left = Arc::new( MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(), );