Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'clickbench_extended' benchmark #8860

Closed
alamb opened this issue Jan 14, 2024 · 4 comments · Fixed by #8861
Closed

Add 'clickbench_extended' benchmark #8860

alamb opened this issue Jan 14, 2024 · 4 comments · Fixed by #8861
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jan 14, 2024

Is your feature request related to a problem or challenge?

The ClickBench benchmark has excellent coverage for aggregate / grouping

We have used the clickbench benchmark, run via bench.sh, for important work improving aggregates such as #6988 and #7064. However there are some important optimizations like #8849 and #7191 from @avantgardnerio where the clickbench benchmark does not cover the existing usecase

For example, @jayzhan211 's change in #8849 (comment) makes certain realistic queries

Details on `bench.sh`

$ ./benchmarks/bench.sh --help

Orchestrates running benchmarks against DataFusion checkouts

Usage:
./benchmarks/bench.sh data [benchmark]
./benchmarks/bench.sh run [benchmark]
./benchmarks/bench.sh compare <branch1> <branch2>

**********
Examples:
**********
# Create the datasets for all benchmarks in /Users/andrewlamb/Software/arrow-datafusion/benchmarks/data
./bench.sh data

# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion
DATAFASION_DIR=/source/arrow-datafusion ./bench.sh run tpch

**********
* Commands
**********
data:         Generates data needed for benchmarking
run:          Runs the named benchmark
compare:      Comares results from benchmark runs

**********
* Benchmarks
**********
all(default): Data/Run/Compare for all benchmarks
tpch:                   TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table
tpch_mem:               TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10:                 TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table
tpch10_mem:             TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet:                Benchmark of parquet reader's filtering speed
sort:                   Benchmark of sorting speed
clickbench_1:           ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet

**********
* Supported Configuration (Environment Variables)
**********
DATA_DIR        directory to store datasets
CARGO_COMMAND   command that runs the benchmark binary
DATAFASION_DIR  directory to use (default /Users/andrewlamb/Software/arrow-datafusion/benchmarks/..)

Describe the solution you'd like

I would like to add a new benchmark to bench.sh that uses the same dataset but has different queries than the existing

$ ./benchmarks/bench.sh run clickbench_extended

The new queries should be

  1. realistic (can write an English sentence explaining the quantity the compute and how it might be used)
  2. Reflect some query pattern

Here is an example from #8849 (comment)

Query: Distinct counts

Query Explanation: Data exploration: understand the qualities of the data in hits.parquet
Query Properties: multiple count distinct aggregates on string datatypes

SELECT
  COUNT(DISTINCT "SearchPhrase"),
  COUNT(DISTINCT "MobilePhone"),
  COUNT(DISTINCT "MobilePhoneModel")
FROM 'hits.parquet';

Describe alternatives you've considered

No response

Additional context

No response

@avantgardnerio
Copy link
Contributor

Hi @alamb , do you know why these are failing to use the code in my PRs? I think there are a number of cases that they could be extended to cover, without an extreme amount of effort.

@alamb
Copy link
Contributor Author

alamb commented Jan 14, 2024

Hi @alamb , do you know why these are failing to use the code in my PRs? I think there are a number of cases that they could be extended to cover, without an extreme amount of effort.

If you mean "why isn't the code added in #7192 used in the ClickBench queries" I think it is because none of the queries has the pattern that PR handles, e.g.

SELECT min(time), group_id FROM ... GROUP BY group_id ORDER BY min(time) LIMIT ...

@avantgardnerio
Copy link
Contributor

Hi @alamb , do you know why these are failing to use the code in my PRs? I think there are a number of cases that they could be extended to cover, without an extreme amount of effort.

If you mean "why isn't the code added in #7192 used in the ClickBench queries" I think it is because none of the queries has the pattern that PR handles, e.g.

SELECT min(time), group_id FROM ... GROUP BY group_id ORDER BY min(time) LIMIT ...

ah, okay. I was hoping some queries were similar enough to be accelerated by that code, given a little effort, but it does not appear that way.

@alamb
Copy link
Contributor Author

alamb commented Jan 14, 2024

ah, okay. I was hoping some queries were similar enough to be accelerated by that code, given a little effort, but it does not appear that way.

Indeed -- from my perspective the ClickBench queries can not be changed (as we don't define them, they come from the clickbench benchmark itself). That is why I propose adding an extended version here.

What I would like to do is to come up with some queries that do have the pattern that is accelerated that use the same data. What about something like:

"Find the top 10 most recently active users"

WITH hits 
AS® (SELECT "UserID", "EventDate"::INT::DATE::TIMESTAMP + arrow_cast("EventTime", 'Interval(DayTime)') as time from 'hits.parquet') 
SELECT min(time), "UserID" from hits GROUP BY "UserID" ORDER BY min(time)  limit 10;

Though this query appears not to use the optimization 🤔

❯ explain WITH hits AS (SELECT "UserID", "EventDate"::INT::DATE::TIMESTAMP + arrow_cast("EventTime", 'Interval(DayTime)') as time from 'hits.parquet') SELECT min(time), "UserID" from hits GROUP BY "UserID" ORDER BY min(time)  limit 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |   Sort: MIN(hits.time) ASC NULLS LAST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     Projection: MIN(hits.time), hits.UserID                                                                                                                                                                                                                                                                                                                                                                                             |
|               |       Aggregate: groupBy=[[hits.UserID]], aggr=[[MIN(hits.time)]]                                                                                                                                                                                                                                                                                                                                                                       |
|               |         SubqueryAlias: hits                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |           Projection: hits.parquet.UserID, CAST(CAST(CAST(hits.parquet.EventDate AS Int32) AS Date32) AS Timestamp(Nanosecond, None)) + CAST(hits.parquet.EventTime AS Interval(DayTime)) AS time                                                                                                                                                                                                                                       |
|               |             TableScan: hits.parquet projection=[EventTime, EventDate, UserID]                                                                                                                                                                                                                                                                                                                                                           |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |   SortPreservingMergeExec: [MIN(hits.time)@0 ASC NULLS LAST], fetch=10                                                                                                                                                                                                                                                                                                                                                                  |
|               |     SortExec: TopK(fetch=10), expr=[MIN(hits.time)@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                    |
|               |       ProjectionExec: expr=[MIN(hits.time)@1 as MIN(hits.time), UserID@0 as UserID]                                                                                                                                                                                                                                                                                                                                                     |
|               |         AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[MIN(hits.time)]                                                                                                                                                                                                                                                                                                                                           |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                   |
|               |             RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16                                                                                                                                                                                                                                                                                                                                                     |
|               |               AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[MIN(hits.time)]                                                                                                                                                                                                                                                                                                                                              |
|               |                 ProjectionExec: expr=[UserID@2 as UserID, CAST(CAST(CAST(EventDate@1 AS Int32) AS Date32) AS Timestamp(Nanosecond, None)) + CAST(EventTime@0 AS Interval(DayTime)) as time]                                                                                                                                                                                                                                             |
|               |                   ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[EventTime, EventDate, UserID] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.048 seconds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants