Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pruning filters from TopK state (optimize ORDER BY LIMIT queries) #15037

Open
Tracked by #15512
adriangb opened this issue Mar 5, 2025 · 12 comments · May be fixed by #15301
Open
Tracked by #15512

Dynamic pruning filters from TopK state (optimize ORDER BY LIMIT queries) #15037

adriangb opened this issue Mar 5, 2025 · 12 comments · May be fixed by #15301
Assignees
Labels
enhancement New feature or request

Comments

@adriangb
Copy link
Contributor

adriangb commented Mar 5, 2025

Is your feature request related to a problem or challenge?

From discussion with @alamb yesterday the idea came up of optimizing queries like select * from data order by timestamp desc limit 10 for the case where the data is not perfectly sorted by timestamp but mostly follows a sorted pattern.

You can imagine this data gets created if multiple sources with clock skews, network delays, etc. are writing data and you don't do anything fancy to guarantee perfect sorting by timestamp (i.e. you naively write out the data to Parquet, maybe do some compaction, etc.). The point is that 99% of yesterday's files have a timestamp smaller than 99% of today's files but there may be a couple seconds of overlap between files. To be concrete, let's say this is our data:

file min max
1 1 10
2 9 19
3 20 31
4 30 35

Currently DataFusion will exhaustively open each file, read the timestamp column and feed it into a TopK.
I think we can do a lot better if we:

  • Use file stats to decide which files to work on first. In this case it makes sense to start with file 4 and 3 (assuming we have parallelism of 2).
  • Let's say that between those two we have 10 rows, so we've already filled up our TopK. The only way more things would get added to our TopK is if they are greater than the smallest item already seen (let's say that's 20, the smallest value in file 3).
  • Now we know just from statistics that we can skip files 2 and 1 because neither of them can have any timestamp > 20.

Extrapolating this to scenarios where you have years worth / TBs of data and want a limit 5 would yield orders of magnitude improvement I think.

@alamb mentioned this sounds similar to Dynamic Filters, I assume this must be a known technique (or my analysis may be completely wrong 😆 ) but I don't know what it would be called.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@adriangb adriangb added the enhancement New feature or request label Mar 5, 2025
@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

@alamb mentioned this sounds similar to Dynamic Filters, I assume this must be a known technique (or my analysis may be completely wrong 😆 ) but I don't know what it would be called.

There was a talk at CIDR this year that mentioned this:

Sponsor Talk 3: The Fine Art of Work Skipping
Stefan Mandl, Snowflake

It seems they wrote a blog about it too here: https://www.snowflake.com/en/engineering-blog/optimizing-top-k-aggregation-snowflake/

@adriangb
Copy link
Contributor Author

adriangb commented Mar 5, 2025

Nice to know I'm not totally off on the idea 😄

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

Nice to know I'm not totally off on the idea 😄

Not at all!

@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

BTW I am pretty sure DuckDB is using this technique and why they are so much faster on ClickBench Q23:

@adriangb
Copy link
Contributor Author

Does anyone have a handle on how we might implement this? I was thinking we’d need to add a method to exec operators called apply_filter but that basically sends down the additional filter and by default it gets forwarded to children until it hits an exec that knows what to do with it (eg DataSourceExec). But I’m not very clear beyond that.

@alamb
Copy link
Contributor

alamb commented Mar 18, 2025

Does anyone have a handle on how we might implement this? I was thinking we’d need to add a method to exec operators called apply_filter but that basically sends down the additional filter and by default it gets forwarded to children until it hits an exec that knows what to do with it (eg DataSourceExec). But I’m not very clear beyond that.

To begin with I would suggest:

  1. Make a new PhysicalExpr named something like TopKRuntimeFilter
  2. Add a physical optimizer pass that runs after all other passes (so the structure doesn't change) that finds TopK nodes and tries to find connected Scans the (start with some basic rules, don't try and go past joins, etc)
  3. Add TopKRuntimeFilter to those scans

Then the trick will be to figure out how to share the TopKHeap created in the TopK operator

With the TopKRuntimeFilter

And then orchestrate concurrent access to it somehow

adriangb added a commit to pydantic/datafusion that referenced this issue Mar 19, 2025
@adriangb adriangb linked a pull request Mar 19, 2025 that will close this issue
@adriangb
Copy link
Contributor Author

@alamb I implemented something like that in #15301

@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

Thanks @adriangb -- I will try and review it asap (hopefully tomorrow afternoon or tomorrow)

adriangb added a commit to pydantic/datafusion that referenced this issue Mar 20, 2025
@adriangb
Copy link
Contributor Author

We already have Statistics on PartitionedFile so we could potentially use Dynamic filters to prune based on those before opening the file

adriangb added a commit to pydantic/datafusion that referenced this issue Mar 26, 2025
adriangb added a commit to pydantic/datafusion that referenced this issue Mar 27, 2025
adriangb added a commit to pydantic/datafusion that referenced this issue Mar 27, 2025
@alamb
Copy link
Contributor

alamb commented Mar 28, 2025

@adriangb and I had a discussion about #15301

here are some notes:

Usecases:

  • TopK dynamic filter pushdown
    • Prune files with dynamic filter based on statistics
    • Prune row groups with dynamic filter based on statistics
    • Prune row pages with dynamic filter based on statistics
    • Apply during filtering when pushdown enabled
  • Join SIPs

Pros / Cons

The pros for merging this PR are:

  • We already have benchmarks that show some performance improvement
    The cons:
  • It requires special implementation for any operators (like FileOpenenr) to take advantage of such filters. THis is not a blocker in my mind – but I do think implementing a PhysicalExpr is a cleaner design. As Adrian says, we can refactor it over time if/when PhysicalExpr gets more sophisticated
  • We will get even more performance when filter_pushdown is enabled (again maybe this is just follow on work)

Nice to haves

  • For a plan with multiple partitions (e.g. for 16 input partitions, we end up with 17 top heaps – one for each partition and then a global one), but this PR can only apply the per-partition top k value.
  • It would be nice to somehow be able to use all the top values (aka pick the smallest one) when filtering.
  • This PR takes a snapshot of the contents of the TopK heap when a file is opened and never changes it.
    • This is good for pruning as all the pruning (file, row group and page) happens on file opening
    • It is not as good for filter_pushdown when the values in the topK heap can change over the course of the query so using the snapshot means the dynamic filter doesn’t improve over time

I believe adrian is going to look into these – but I also think they could easily be done as a follow on PR

@adriangb
Copy link
Contributor Author

wrt waiting for filter pushdown to be enabled by default, I think we're just making our lives harder by coupling them, especially since we can already test them together under a feature flag.

I also would like to leverage this work to justify a lot of other optimizations:

  • piping through file level stats and using those to prune with dynamic filters -> prune without even reading parquet metadata
  • roughly ordering files by the sort order based on stats -> this optimization becomes even more efficient
  • resolve merge conflicts and take next steps with per-file filters

This is unfortunately a blocker for all of that

@alamb alamb changed the title Dynamic pruning filters from TopK state Dynamic pruning filters from TopK state (optimize ORDER BY LIMIT queries) Mar 31, 2025
@alamb
Copy link
Contributor

alamb commented Mar 31, 2025

I plan to spend a non trivial amount of time working on this with @adriangb this week

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants