Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into fix-regexp_match
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 18, 2024
2 parents 2877e7d + 3405234 commit 4d90ff1
Show file tree
Hide file tree
Showing 7 changed files with 1,220 additions and 621 deletions.
31 changes: 30 additions & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ pub fn swap_hash_join(
partition_mode,
hash_join.null_equals_null(),
)?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
hash_join.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
) {
) || hash_join.projection.is_some()
{
Ok(Arc::new(new_join))
} else {
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
Expand Down Expand Up @@ -1287,6 +1289,33 @@ mod tests_statistical {
);
}

#[tokio::test]
async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
let (big, small) = create_big_and_small();
let join = Arc::new(HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Arc::new(Column::new_with_schema("big_col", &big.schema())?),
Arc::new(Column::new_with_schema("small_col", &small.schema())?),
)],
None,
&JoinType::Inner,
Some(vec![1]),
PartitionMode::Partitioned,
false,
)?);
let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
.expect("swap_hash_join must support joins with projections");
let swapped_join = swapped.as_any().downcast_ref::<HashJoinExec>().expect(
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
);
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
assert_eq!(swapped.schema().fields.len(), 1);
assert_eq!(swapped.schema().fields[0].name(), "small_col");
Ok(())
}

#[tokio::test]
async fn test_swap_reverting_projection() {
let left_schema = Schema::new(vec![
Expand Down
12 changes: 4 additions & 8 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,14 @@ async fn test_left_join_1k() {
}

#[tokio::test]
// flaky for HjSmj case
// https://github.com/apache/datafusion/issues/12359
async fn test_left_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Left,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::NljHj], false)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

Expand Down Expand Up @@ -229,6 +227,7 @@ async fn test_anti_join_1k() {
#[tokio::test]
// flaky for HjSmj case, giving 1 rows difference sometimes
// https://github.com/apache/datafusion/issues/11555
#[ignore]
async fn test_anti_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
Expand Down Expand Up @@ -515,14 +514,11 @@ impl JoinFuzzTestCase {
"input2",
);

if join_tests.contains(&JoinTestType::NljHj)
&& join_tests.contains(&JoinTestType::NljHj)
&& nlj_rows != hj_rows
{
if join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows {
println!("=============== HashJoinExec ==================");
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
println!("=============== NestedLoopJoinExec ==================");
smj_formatted_sorted.iter().for_each(|s| println!("{}", s));
nlj_formatted_sorted.iter().for_each(|s| println!("{}", s));

Self::save_partitioned_batches_as_parquet(
&nlj_collected,
Expand Down
Loading

0 comments on commit 4d90ff1

Please sign in to comment.