From 5865f7df36e1fd90fafd40d96900c29749abe470 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 29 Apr 2024 16:56:14 -0700 Subject: [PATCH 1/5] Fix: Sort Merge Join crashes on TPCH Q21 --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 8da345cdfca6..eaa03eacf4f5 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1415,6 +1415,9 @@ fn get_filter_column( .map(|i| buffered_columns[i.index].clone()) .collect::>(); +// dbg!(&left_columns); +// dbg!(&right_columns); + filter_columns.extend(left_columns); filter_columns.extend(right_columns); } From cb4eb2b6d58adc75638aea7c99f74438e04c6e11 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 30 May 2024 08:42:29 -0700 Subject: [PATCH 2/5] Fix LeftAnti SMJ join when the join filter is set --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index eaa03eacf4f5..8dad3eadc7ac 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1415,8 +1415,8 @@ fn get_filter_column( .map(|i| buffered_columns[i.index].clone()) .collect::>(); -// dbg!(&left_columns); -// dbg!(&right_columns); + // dbg!(&left_columns); + // dbg!(&right_columns); filter_columns.extend(left_columns); filter_columns.extend(right_columns); From b66822e93982c104c98cdc28c8b5622147bedb1d Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 31 May 2024 08:35:50 -0700 Subject: [PATCH 3/5] rm dbg --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 8dad3eadc7ac..8da345cdfca6 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1415,9 +1415,6 @@ fn get_filter_column( .map(|i| buffered_columns[i.index].clone()) .collect::>(); - // dbg!(&left_columns); - // dbg!(&right_columns); - filter_columns.extend(left_columns); filter_columns.extend(right_columns); } From f1df3bb3aca86827eb99c9f44601330a6041fb0e Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 5 Jun 2024 16:58:31 -0700 Subject: [PATCH 4/5] Bench: Add `PREFER_HASH_JOIN` env variable --- benchmarks/README.md | 10 ++++++-- benchmarks/bench.sh | 56 ++++++++++++++------------------------------ 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index b402dd6ea048..8c666c71925c 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -67,6 +67,13 @@ Create / download a specific dataset (TPCH) Data is placed in the `data` subdirectory. +## Change join algorithm +The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm. +To run TPCH benchmarks with other than HASH join: +```shell +PREFER_HASH_JOIN=false ./bench.sh run tpch +``` + ## Comparing performance of main and a branch ```shell @@ -177,7 +184,6 @@ The benchmark program also supports CSV and Parquet input file formats and a uti ```bash cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet ``` - Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`. ### Comparing results between runs @@ -261,7 +267,7 @@ SUBCOMMANDS: # Benchmarks -The output of `dfbench` help includes a descripion of each benchmark, which is reproducedd here for convenience +The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience ## ClickBench diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 87d0720ccb63..77779a12c450 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} #CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations +PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} usage() { echo " @@ -52,8 +53,8 @@ Examples: # Create the datasets for all benchmarks in $DATA_DIR ./bench.sh data -# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion -DATAFUSION_DIR=/source/arrow-datafusion ./bench.sh run tpch +# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion +DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch ********** * Commands @@ -67,10 +68,8 @@ compare: Compares results from benchmark runs ********** all(default): Data/Run/Compare for all benchmarks tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join -tpch_smj: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, sort merge join tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join -tpch_smj10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, sort merge join tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed @@ -81,10 +80,11 @@ clickbench_extended: ClickBench "inspired" queries against a single parquet ( ********** * Supported Configuration (Environment Variables) ********** -DATA_DIR directory to store datasets -CARGO_COMMAND command that runs the benchmark binary -DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) -RESULTS_NAME folder where the benchmark files are stored +DATA_DIR directory to store datasets +CARGO_COMMAND command that runs the benchmark binary +DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) +RESULTS_NAME folder where the benchmark files are stored +PREFER_HASH_JOIN Prefer hash join algorithm(default true) " exit 1 } @@ -131,6 +131,7 @@ main() { echo "BENCHMARK: ${BENCHMARK}" echo "DATA_DIR: ${DATA_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" + echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" echo "***************************" case "$BENCHMARK" in all) @@ -185,6 +186,7 @@ main() { echo "DATA_DIR: ${DATA_DIR}" echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" + echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN} echo "***************************" # navigate to the appropriate directory @@ -215,12 +217,6 @@ main() { tpch_mem10) run_tpch_mem "10" ;; - tpch_smj) - run_tpch_smj "1" - ;; - tpch_smj10) - run_tpch_smj "10" - ;; parquet) run_parquet ;; @@ -306,7 +302,7 @@ data_tpch() { else echo " creating parquet files using benchmark binary ..." pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet + $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet popd > /dev/null fi } @@ -323,22 +319,7 @@ run_tpch() { RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE} -} - -# Runs the tpch benchmark with sort merge join -run_tpch_smj() { - SCALE_FACTOR=$1 - if [ -z "$SCALE_FACTOR" ] ; then - echo "Internal error: Scale factor not specified" - exit 1 - fi - TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}" - - RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json" - echo "RESULTS_FILE: ${RESULTS_FILE}" - echo "Running tpch SMJ benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE} + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE} } # Runs the tpch in memory @@ -354,7 +335,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE} + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE} } # Runs the parquet filter benchmark @@ -362,7 +343,7 @@ run_parquet() { RESULTS_FILE="${RESULTS_DIR}/parquet.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running parquet filter benchmark..." - $CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} + $CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} } # Runs the sort benchmark @@ -370,7 +351,7 @@ run_sort() { RESULTS_FILE="${RESULTS_DIR}/sort.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running sort benchmark..." - $CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} + $CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} } @@ -424,7 +405,7 @@ run_clickbench_1() { RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } # Runs the clickbench benchmark with the partitioned parquet files @@ -432,7 +413,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } # Runs the clickbench "extended" benchmark with a single large parquet file @@ -440,10 +421,9 @@ run_clickbench_extended() { RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) extended benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE} } - compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" BRANCH1="$1" From a3c11f9764c515a7725ea4f3f4bd11c516d599ea Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 5 Jun 2024 17:05:38 -0700 Subject: [PATCH 5/5] Bench: Add `PREFER_HASH_JOIN` env variable --- benchmarks/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 8c666c71925c..afaf28bb7576 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -67,9 +67,9 @@ Create / download a specific dataset (TPCH) Data is placed in the `data` subdirectory. -## Change join algorithm +## Select join algorithm The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm. -To run TPCH benchmarks with other than HASH join: +To run TPCH benchmarks with join other than HASH: ```shell PREFER_HASH_JOIN=false ./bench.sh run tpch ```