Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into issue_5265
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 3, 2024
2 parents 95e635f + 85f92ef commit 8001eed
Show file tree
Hide file tree
Showing 395 changed files with 16,314 additions and 8,838 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
under the License.
-->

* [DataFusion CHANGELOG](./datafusion/CHANGELOG.md)
Change logs for each release can be found [here](dev/changelog).


For older versions, see [apache/arrow/CHANGELOG.md](https://github.com/apache/arrow/blob/master/CHANGELOG.md).
51 changes: 27 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"datafusion/expr",
"datafusion/expr-common",
"datafusion/execution",
"datafusion/ffi",
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
Expand Down Expand Up @@ -59,7 +60,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.79"
version = "42.1.0"
version = "42.2.0"

[workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can
Expand Down Expand Up @@ -92,29 +93,30 @@ bytes = "1.4"
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "42.1.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "42.1.0" }
datafusion-common = { path = "datafusion/common", version = "42.1.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.1.0" }
datafusion-execution = { path = "datafusion/execution", version = "42.1.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.1.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.1.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.1.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.1.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.1.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.1.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.1.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "42.1.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.1.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.1.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.1.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "42.1.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "42.1.0" }
datafusion-proto = { path = "datafusion/proto", version = "42.1.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "42.1.0" }
datafusion-sql = { path = "datafusion/sql", version = "42.1.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "42.1.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "42.1.0" }
datafusion = { path = "datafusion/core", version = "42.2.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "42.2.0" }
datafusion-common = { path = "datafusion/common", version = "42.2.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.2.0" }
datafusion-execution = { path = "datafusion/execution", version = "42.2.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.2.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.2.0" }
datafusion-ffi = { path = "datafusion/ffi", version = "42.2.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.2.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.2.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.2.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.2.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.2.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "42.2.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.2.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.2.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.2.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "42.2.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "42.2.0" }
datafusion-proto = { path = "datafusion/proto", version = "42.2.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "42.2.0" }
datafusion-sql = { path = "datafusion/sql", version = "42.2.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "42.2.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "42.2.0" }
doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
Expand Down Expand Up @@ -169,3 +171,4 @@ large_futures = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_qualifications = "deny"
38 changes: 38 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ steps.
The tests sort the entire dataset using several different sort
orders.

## IMDB

Run Join Order Benchmark (JOB) on IMDB dataset.

The Internet Movie Database (IMDB) dataset contains real-world movie data. Unlike synthetic datasets like TPCH, which assume uniform data distribution and uncorrelated columns, the IMDB dataset includes skewed data and correlated columns (which are common for real dataset), making it more suitable for testing query optimizers, particularly for cardinality estimation.

This benchmark is derived from [Join Order Benchmark](https://github.com/gregrahn/join-order-benchmark).

See paper [How Good Are Query Optimizers, Really](http://www.vldb.org/pvldb/vol9/p204-leis.pdf) for more details.

## TPCH

Run the tpch benchmark.
Expand All @@ -342,6 +352,34 @@ This benchmarks is derived from the [TPC-H][1] version
[2]: https://github.com/databricks/tpch-dbgen.git,
[2.17.1]: https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf

## External Aggregation

Run the benchmark for aggregations with limited memory.

When the memory limit is exceeded, the aggregation intermediate results will be spilled to disk, and finally read back with sort-merge.

External aggregation benchmarks run several aggregation queries with different memory limits, on TPCH `lineitem` table. Queries can be found in [`external_aggr.rs`](src/bin/external_aggr.rs).

This benchmark is inspired by [DuckDB's external aggregation paper](https://hannes.muehleisen.org/publications/icde2024-out-of-core-kuiper-boncz-muehleisen.pdf), specifically Section VI.

### External Aggregation Example Runs
1. Run all queries with predefined memory limits:
```bash
# Under 'benchmarks/' directory
cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '....../data/tpch_sf1' -o '/tmp/aggr.json'
```

2. Run a query with specific memory limit:
```bash
cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '....../data/tpch_sf1' -o '/tmp/aggr.json' --query 1 --memory-limit 30M
```

3. Run all queries with `bench.sh` script:
```bash
./bench.sh data external_aggr
./bench.sh run external_aggr
```


# Older Benchmarks

Expand Down
29 changes: 26 additions & 3 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -170,6 +171,10 @@ main() {
imdb)
data_imdb
;;
external_aggr)
# same data as for tpch
data_tpch "1"
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -212,6 +217,7 @@ main() {
run_clickbench_partitioned
run_clickbench_extended
run_imdb
run_external_aggr
;;
tpch)
run_tpch "1"
Expand Down Expand Up @@ -243,6 +249,9 @@ main() {
imdb)
run_imdb
;;
external_aggr)
run_external_aggr
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -357,15 +366,15 @@ run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}


Expand Down Expand Up @@ -524,7 +533,21 @@ run_imdb() {
$CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}


# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/external_aggr.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running external aggregation benchmark..."

# Only parquet is supported.
# Since per-operator memory limit is calculated as (total-memory-limit /
# number-of-partitions), and by default `--partitions` is set to number of
# CPU cores, we set a constant number of partitions to prevent this
# benchmark to fail on some machines.
$CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
}


compare_benchmarks() {
Expand Down
Loading

0 comments on commit 8001eed

Please sign in to comment.