Skip to content

Commit

Permalink
Merge branch 'main' into sqlparse-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Sep 12, 2024
2 parents edf318b + 5b6b404 commit ab88318
Show file tree
Hide file tree
Showing 399 changed files with 14,746 additions and 6,676 deletions.
53 changes: 0 additions & 53 deletions .github/workflows/pr_comment.yml

This file was deleted.

27 changes: 15 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

[workspace]
# datafusion-cli is excluded because of its Cargo.lock. See datafusion-cli/README.md.
exclude = ["datafusion-cli", "dev/depcheck"]
members = [
"datafusion/common",
Expand All @@ -33,7 +34,6 @@ members = [
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
"datafusion/physical-expr-functions-aggregate",
"datafusion/physical-optimizer",
"datafusion/physical-plan",
"datafusion/proto",
Expand Down Expand Up @@ -69,22 +69,22 @@ version = "41.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "52.2.0", features = [
arrow = { version = "53.0.0", features = [
"prettyprint",
] }
arrow-array = { version = "52.2.0", default-features = false, features = [
arrow-array = { version = "53.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "52.2.0", default-features = false }
arrow-flight = { version = "52.2.0", features = [
arrow-buffer = { version = "53.0.0", default-features = false }
arrow-flight = { version = "53.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "52.2.0", default-features = false, features = [
arrow-ipc = { version = "53.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "52.2.0", default-features = false }
arrow-schema = { version = "52.2.0", default-features = false }
arrow-string = { version = "52.2.0", default-features = false }
arrow-ord = { version = "53.0.0", default-features = false }
arrow-schema = { version = "53.0.0", default-features = false }
arrow-string = { version = "53.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
Expand All @@ -106,7 +106,6 @@ datafusion-functions-window = { path = "datafusion/functions-window", version =
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
datafusion-physical-expr-functions-aggregate = { path = "datafusion/physical-expr-functions-aggregate", version = "41.0.0" }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "41.0.0" }
Expand All @@ -123,13 +122,17 @@ indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.10.2", default-features = false }
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "52.2.0", default-features = false, features = [
parquet = { version = "53.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
] }
pbjson = { version = "0.7.0" }
# Should match arrow-flight's version of prost.
prost = "0.13.1"
prost-derive = "0.13.1"
rand = "0.8"
regex = "1.8"
rstest = "0.22.0"
Expand Down
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
[discord-badge]: https://img.shields.io/discord/885562378132000778.svg?logo=discord&style=flat-square
[discord-url]: https://discord.com/invite/Qw5gKqHxUM

[Website](https://github.com/apache/datafusion) |
[Guides](https://github.com/apache/datafusion/tree/main/docs) |
[Website](https://datafusion.apache.org/) |
[API Docs](https://docs.rs/datafusion/latest/datafusion/) |
[Chat](https://discord.com/channels/885562378132000778/885562378132000781)

<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>
<a href="https://datafusion.apache.org/">
<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>
</a>

Apache DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in
[Rust](http://rustlang.org), using the [Apache Arrow](https://arrow.apache.org)
Expand Down Expand Up @@ -97,9 +98,11 @@ Optional features:

## Rust Version Compatibility Policy

DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support
each stable Rust version for 6 months after it is
[released](https://github.com/rust-lang/rust/blob/master/RELEASES.md). This
generally translates to support for the most recent 3 to 4 stable Rust versions.
DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support stable [4 latest
Rust versions](https://releases.rs) OR the stable minor Rust version as of 4 months, whichever is lower.

For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81.0` DataFusion will support 1.78.0, which is 3 minor versions prior to the most minor recent `1.81`.

If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)
38 changes: 19 additions & 19 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench "inspired" queries against a single parquet (DataFusion specific)
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -106,7 +106,7 @@ while [[ $# -gt 0 ]]; do
shift # past argument
usage
;;
-*|--*)
-*)
echo "Unknown option $1"
exit 1
;;
Expand Down Expand Up @@ -175,7 +175,7 @@ main() {
run)
# Parse positional parameters
BENCHMARK=${ARG2:-"${BENCHMARK}"}
BRANCH_NAME=$(cd ${DATAFUSION_DIR} && git rev-parse --abbrev-ref HEAD)
BRANCH_NAME=$(cd "${DATAFUSION_DIR}" && git rev-parse --abbrev-ref HEAD)
BRANCH_NAME=${BRANCH_NAME//\//_} # mind blowing syntax to replace / with _
RESULTS_NAME=${RESULTS_NAME:-"${BRANCH_NAME}"}
RESULTS_DIR=${RESULTS_DIR:-"$SCRIPT_DIR/results/$RESULTS_NAME"}
Expand All @@ -189,7 +189,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN}
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -288,7 +288,7 @@ data_tpch() {
echo " tbl files exist ($FILE exists)."
else
echo " creating tbl files with tpch_dbgen..."
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s ${SCALE_FACTOR}
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}"
fi

# Copy expected answers into the ./data/answers directory if it does not already exist
Expand Down Expand Up @@ -325,7 +325,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}

# Runs the tpch in memory
Expand All @@ -341,23 +341,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}"
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}


Expand All @@ -369,7 +369,7 @@ data_clickbench_1() {
pushd "${DATA_DIR}" > /dev/null

# Avoid downloading if it already exists and is the right size
OUTPUT_SIZE=`wc -c hits.parquet 2>/dev/null | awk '{print $1}' || true`
OUTPUT_SIZE=$(wc -c hits.parquet 2>/dev/null | awk '{print $1}' || true)
echo -n "Checking hits.parquet..."
if test "${OUTPUT_SIZE}" = "14779976446"; then
echo -n "... found ${OUTPUT_SIZE} bytes ..."
Expand All @@ -393,7 +393,7 @@ data_clickbench_partitioned() {
pushd "${DATA_DIR}/hits_partitioned" > /dev/null

echo -n "Checking hits_partitioned..."
OUTPUT_SIZE=`wc -c * 2>/dev/null | tail -n 1 | awk '{print $1}' || true`
OUTPUT_SIZE=$(wc -c -- * 2>/dev/null | tail -n 1 | awk '{print $1}' || true)
if test "${OUTPUT_SIZE}" = "14737666736"; then
echo -n "... found ${OUTPUT_SIZE} bytes ..."
else
Expand All @@ -411,23 +411,23 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}"
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}"
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o "${RESULTS_FILE}"
}

compare_benchmarks() {
Expand All @@ -447,12 +447,12 @@ compare_benchmarks() {
fi

echo "Comparing ${BRANCH1} and ${BRANCH2}"
for bench in `ls ${BASE_RESULTS_DIR}/${BRANCH1}` ; do
RESULTS_FILE1="${BASE_RESULTS_DIR}/${BRANCH1}/${bench}"
RESULTS_FILE2="${BASE_RESULTS_DIR}/${BRANCH2}/${bench}"
for RESULTS_FILE1 in "${BASE_RESULTS_DIR}/${BRANCH1}"/*.json ; do
BENCH=$(basename "${RESULTS_FILE1}")
RESULTS_FILE2="${BASE_RESULTS_DIR}/${BRANCH2}/${BENCH}"
if test -f "${RESULTS_FILE2}" ; then
echo "--------------------"
echo "Benchmark ${bench}"
echo "Benchmark ${BENCH}"
echo "--------------------"
PATH=$VIRTUAL_ENV/bin:$PATH python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
else
Expand All @@ -463,7 +463,7 @@ compare_benchmarks() {
}

setup_venv() {
python3 -m venv $VIRTUAL_ENV
python3 -m venv "$VIRTUAL_ENV"
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
}

Expand Down
15 changes: 15 additions & 0 deletions benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ LIMIT 10;
```


### Q3: What is the income distribution for users in specific regions

**Question**: "What regions and social networks have the highest variance of parameter price

**Important Query Properties**: STDDEV and VAR aggregation functions, GROUP BY multiple small ints

```sql
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice")
FROM 'hits.parquet'
GROUP BY "SocialSourceNetworkID", "RegionID"
HAVING s IS NOT NULL
ORDER BY s DESC
LIMIT 10;
```

## Data Notes

Here are some interesting statistics about the data used in the queries
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/queries/clickbench/extended.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits;
SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10;
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down
9 changes: 5 additions & 4 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -268,7 +268,8 @@ impl RunOpt {
}
"parquet" => {
let path = format!("{path}/{table}");
let format = ParquetFormat::default().with_enable_pruning(true);
let format = ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone());

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down Expand Up @@ -344,7 +345,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -378,7 +379,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct CommonOpt {
/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
pub force_view_types: bool,
}

impl CommonOpt {
Expand Down
Loading

0 comments on commit ab88318

Please sign in to comment.