Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into issue_12117
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 6, 2024
2 parents 9adc842 + 923f098 commit 4c9e40d
Show file tree
Hide file tree
Showing 316 changed files with 9,487 additions and 5,427 deletions.
53 changes: 0 additions & 53 deletions .github/workflows/pr_comment.yml

This file was deleted.

27 changes: 15 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

[workspace]
# datafusion-cli is excluded because of its Cargo.lock. See datafusion-cli/README.md.
exclude = ["datafusion-cli", "dev/depcheck"]
members = [
"datafusion/common",
Expand All @@ -33,7 +34,6 @@ members = [
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
"datafusion/physical-expr-functions-aggregate",
"datafusion/physical-optimizer",
"datafusion/physical-plan",
"datafusion/proto",
Expand Down Expand Up @@ -69,22 +69,22 @@ version = "41.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "52.2.0", features = [
arrow = { version = "53.0.0", features = [
"prettyprint",
] }
arrow-array = { version = "52.2.0", default-features = false, features = [
arrow-array = { version = "53.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "52.2.0", default-features = false }
arrow-flight = { version = "52.2.0", features = [
arrow-buffer = { version = "53.0.0", default-features = false }
arrow-flight = { version = "53.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "52.2.0", default-features = false, features = [
arrow-ipc = { version = "53.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "52.2.0", default-features = false }
arrow-schema = { version = "52.2.0", default-features = false }
arrow-string = { version = "52.2.0", default-features = false }
arrow-ord = { version = "53.0.0", default-features = false }
arrow-schema = { version = "53.0.0", default-features = false }
arrow-string = { version = "53.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
Expand All @@ -106,7 +106,6 @@ datafusion-functions-window = { path = "datafusion/functions-window", version =
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
datafusion-physical-expr-functions-aggregate = { path = "datafusion/physical-expr-functions-aggregate", version = "41.0.0" }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "41.0.0" }
Expand All @@ -123,13 +122,17 @@ indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.10.2", default-features = false }
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "52.2.0", default-features = false, features = [
parquet = { version = "53.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
] }
pbjson = { version = "0.7.0" }
# Should match arrow-flight's version of prost.
prost = "0.13.1"
prost-derive = "0.13.1"
rand = "0.8"
regex = "1.8"
rstest = "0.22.0"
Expand Down
38 changes: 19 additions & 19 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench "inspired" queries against a single parquet (DataFusion specific)
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -106,7 +106,7 @@ while [[ $# -gt 0 ]]; do
shift # past argument
usage
;;
-*|--*)
-*)
echo "Unknown option $1"
exit 1
;;
Expand Down Expand Up @@ -175,7 +175,7 @@ main() {
run)
# Parse positional parameters
BENCHMARK=${ARG2:-"${BENCHMARK}"}
BRANCH_NAME=$(cd ${DATAFUSION_DIR} && git rev-parse --abbrev-ref HEAD)
BRANCH_NAME=$(cd "${DATAFUSION_DIR}" && git rev-parse --abbrev-ref HEAD)
BRANCH_NAME=${BRANCH_NAME//\//_} # mind blowing syntax to replace / with _
RESULTS_NAME=${RESULTS_NAME:-"${BRANCH_NAME}"}
RESULTS_DIR=${RESULTS_DIR:-"$SCRIPT_DIR/results/$RESULTS_NAME"}
Expand All @@ -189,7 +189,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN}
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -288,7 +288,7 @@ data_tpch() {
echo " tbl files exist ($FILE exists)."
else
echo " creating tbl files with tpch_dbgen..."
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s ${SCALE_FACTOR}
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}"
fi

# Copy expected answers into the ./data/answers directory if it does not already exist
Expand Down Expand Up @@ -325,7 +325,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}

# Runs the tpch in memory
Expand All @@ -341,23 +341,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}"
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}


Expand All @@ -369,7 +369,7 @@ data_clickbench_1() {
pushd "${DATA_DIR}" > /dev/null

# Avoid downloading if it already exists and is the right size
OUTPUT_SIZE=`wc -c hits.parquet 2>/dev/null | awk '{print $1}' || true`
OUTPUT_SIZE=$(wc -c hits.parquet 2>/dev/null | awk '{print $1}' || true)
echo -n "Checking hits.parquet..."
if test "${OUTPUT_SIZE}" = "14779976446"; then
echo -n "... found ${OUTPUT_SIZE} bytes ..."
Expand All @@ -393,7 +393,7 @@ data_clickbench_partitioned() {
pushd "${DATA_DIR}/hits_partitioned" > /dev/null

echo -n "Checking hits_partitioned..."
OUTPUT_SIZE=`wc -c * 2>/dev/null | tail -n 1 | awk '{print $1}' || true`
OUTPUT_SIZE=$(wc -c -- * 2>/dev/null | tail -n 1 | awk '{print $1}' || true)
if test "${OUTPUT_SIZE}" = "14737666736"; then
echo -n "... found ${OUTPUT_SIZE} bytes ..."
else
Expand All @@ -411,23 +411,23 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}"
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}"
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o "${RESULTS_FILE}"
}

compare_benchmarks() {
Expand All @@ -447,12 +447,12 @@ compare_benchmarks() {
fi

echo "Comparing ${BRANCH1} and ${BRANCH2}"
for bench in `ls ${BASE_RESULTS_DIR}/${BRANCH1}` ; do
RESULTS_FILE1="${BASE_RESULTS_DIR}/${BRANCH1}/${bench}"
RESULTS_FILE2="${BASE_RESULTS_DIR}/${BRANCH2}/${bench}"
for RESULTS_FILE1 in "${BASE_RESULTS_DIR}/${BRANCH1}"/*.json ; do
BENCH=$(basename "${RESULTS_FILE1}")
RESULTS_FILE2="${BASE_RESULTS_DIR}/${BRANCH2}/${BENCH}"
if test -f "${RESULTS_FILE2}" ; then
echo "--------------------"
echo "Benchmark ${bench}"
echo "Benchmark ${BENCH}"
echo "--------------------"
PATH=$VIRTUAL_ENV/bin:$PATH python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
else
Expand All @@ -463,7 +463,7 @@ compare_benchmarks() {
}

setup_venv() {
python3 -m venv $VIRTUAL_ENV
python3 -m venv "$VIRTUAL_ENV"
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
}

Expand Down
15 changes: 15 additions & 0 deletions benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ LIMIT 10;
```


### Q3: What is the income distribution for users in specific regions

**Question**: "What regions and social networks have the highest variance of parameter price

**Important Query Properties**: STDDEV and VAR aggregation functions, GROUP BY multiple small ints

```sql
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice")
FROM 'hits.parquet'
GROUP BY "SocialSourceNetworkID", "RegionID"
HAVING s IS NOT NULL
ORDER BY s DESC
LIMIT 10;
```

## Data Notes

Here are some interesting statistics about the data used in the queries
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/queries/clickbench/extended.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits;
SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10;
Loading

0 comments on commit 4c9e40d

Please sign in to comment.