Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SELECT min/max queries with limit #7198

Closed
alamb opened this issue Aug 4, 2023 · 7 comments
Closed

Optimize SELECT min/max queries with limit #7198

alamb opened this issue Aug 4, 2023 · 7 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Aug 4, 2023

Is your feature request related to a problem or challenge?

The following query pattern shows up in many of our usecases:

SELECT tag, max(time)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

There may also be predicates

In English this query returns the top 10 groups that had the most recent values

A more specific example, @JayjeetAtGithub found that the Jaeger tool issues this query to show the top ten most recent queries

SELECT "trace_id", MAX("time") AS t FROM 'spans' WHERE "service.name" = 'frontend' AND "time" >= to_timestamp(1688713200000000000) AND "time" <= to_timestamp(1689000240000000000) GROUP BY "trace_id" ORDER BY t DESC LIMIT 20;

Describe the solution you'd like

Implement some sort of optimization for this query

Describe alternatives you've considered

I believe #7191 / #7192 from @avantgardnerio is designed for this use case, so that may be sufficient. I did think it was worth documenting the actual end user effect of the change as a separate item which is why I filed this ticket

Additional context

No response

@alamb alamb added the enhancement New feature or request label Aug 4, 2023
@alamb
Copy link
Contributor Author

alamb commented Aug 4, 2023

I am not sure if we can get a general purpose optimization that also handles queries with different aggregates

SELECT tag, field, max(time), min(other_field)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

Though maybe that is not so useful

@avantgardnerio
Copy link
Contributor

avantgardnerio commented Aug 5, 2023

if we can get a general purpose optimization that also handles queries with different aggregates

I don't think #7192 can handle your example. It works by "evicting" (nice term @tustvold ) groups from the accumulator unless they are the current min/max.

So if we run your example:

SELECT tag, field, max(time), min(other_field)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

on:

+-----+-------+-------+-------------+
| tag | time  | field | other_field |
+-----+-------+-------+-------------+
| 1   | 01:00 |       | 0           |
| 2   | 02:00 |       | 1           |
| 3   | 03:00 |       | 1           |
| 4   | 04:00 |       | 1           |
| 5   | 05:00 |       | 1           |
| 6   | 06:00 |       | 1           |
| 7   | 07:00 |       | 1           |
| 8   | 08:00 |       | 1           |
| 9   | 09:00 |       | 1           |
| 10  | 10:00 |       | 1           |
| 11  | 11:00 |       | 1           |
| 1   | 12:00 |       | 999         |
+-----+-------+-------+-------------+

We will:

  1. accumulate 10 tag-groups
  2. run into tag-group 11 with a greater time that group-tag 1 (11:00 vs 01:00)
  3. evict tag-group 1 from our accumulator, along with it's other_field=0 value
  4. run into tag-group 1 again with a greater time than tag-group 2 (12:00 vs 02:00)
  5. evict tag-group 2, replace it with tag-group 1, and it's new "min" of 999

Hopefully this example makes it clear why we can only accumulate values present in the order by clause, given the approach in #7192 based on the functional requirements (not-sorting) of #7191 .

@alamb
Copy link
Contributor Author

alamb commented Aug 5, 2023

The more I think about this, the more I like where @avantgardnerio is going with #7192, and I think we could use the same operator in #7192 for this ticket as well as #6899, and #7196.

I hope we can use the same operator for all these queries because:

  1. It will allow us to pool resources (to make it very fast and efficient)
  2. Can keep the boundaries clearly defined (and this keep the long term maintenance cost down)

"Observation" -- No Aggregates

One key observation that @avantgardnerio made (perhaps implicitly) in #7192 is that even though the query in this ticket has aggregates (max(time)) there is no actual aggregation -- what is needed is to 'keep the top K items per group' where 'top' is defined by some particular sort order.

Proposal

Thus, I think we could make the code in #7192 into a TopKPerPartition ExecutionPlan1 that has the following semantics:

Keeps the top K values, as defined by the order_exprs for each distinct value of partition_exprs

┌───────────────────────────────┐
│       TopKPerPartition        │
│           gby_exprs           │
│          order_exprs          │
│               K               │
└───────────────────────────────┘

Use for min/max queries with limit (this ticket)

So for the

SELECT tag, max(time)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

We would use

TopKPerPartition
  gby_exprs: [tag]
  order_exprs: [time DESC]
  k: 10

General purpose ORDER BY limit query #7196

SELECT c1, c2 
FROM t
ORDER BY c3
LIMIT 10

We could use the same operator (though maybe it has a more optimized implementation when there are no groups, like we have for no group aggregate streams):

TopKPerPartition
  gby_exprs: []
  order_exprs: [c3]
  k: 10

Queries that have a predicate on row_number() in #6899

SELECT ...
  ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn
WHERE
  rn < 10

we could use

TopKPerPartition
  gby_exprs: [value1]
  order_exprs: [value2]
  k: 10

P.s. I also tried, and failed, to think of a clever rewrites at the SQL level.

Footnotes

  1. I think this is what @ozankabak and @comphead were hinting at in https://github.com/apache/arrow-datafusion/issues/6899#issuecomment-1630479576

@JayjeetAtGithub
Copy link
Contributor

Reproducer using Jaegar/IOx: https://github.com/JayjeetAtGithub/iox_observe_bench/blob/main/docs/oom_kill.md
Dataset: https://drive.google.com/drive/folders/1nd3FaZXlsvM8JelXHJjHZONDzvB9UeVs?usp=sharing

@avantgardnerio
Copy link
Contributor

Dataset

@JayjeetAtGithub thank you! I just sent a request for access...

@alamb
Copy link
Contributor Author

alamb commented Aug 8, 2023

Note that the dataset in the above example is in the form of an influxdb_iox catalog.

If you prefer a datafusion-cli only version, here it is:

Download traces.zip (240MB):

In datafusion-cli:

❯ create external table traces stored as parquet location 'traces';
0 rows in set. Query took 0.030 seconds.

❯ SELECT trace_id, MAX(time) FROM traces GROUP BY trace_id ORDER BY MAX(time) DESC LIMIT 1;

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2023

Completed in #7192

@alamb alamb closed this as completed Sep 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants