-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EPIC] Improve the performance of ListingTable #9964
Comments
Interested in this one |
FYI I think this is more like an Epic that can be used to coordinate individual tasks / changes rather than a specific change itself.
Thanks @Lordworms -- one thing that would probably help to start this project along would be to gather some data. Specifically, put the LIstingTable against data on a remote object store (eg. figure out how to write a query against 100 parquet files on an S3 bucket). And then measure how much time is spent:
Does anyone know a good public data set on S3 that we could use to test / benchmark with? |
I got it, I'll search some data first |
I'll start with a random dataset here https://registry.opendata.aws/ |
(It's not a "performance" issue, but rather for better user experience. Also, it requires upstream changes to arrow-rs/object_store.) The recent change in #9912 uses 10 random files to infer the partition columns, this means that we may fail to catch corrupted/manually-changed partitions on table creation (shouldn't be a common case). This is because If we can provide a BFS approach to traverse the Object Store and use that in partition inference, that will be a nice QoL change. Curious to know how we feel about upstream changes for such non-critical changes in DF? |
I think
In this particular case, I think there are different needs for different users (some might want to pay the cost for a full validation, but some might be happy with just a sanity check) One approach would be to add a config option to DataFusion that controls the maxumum number of paths to check when creating a listing table 🤔
In general I think upstream changes are great to propose if they help more than just DataFusion |
I have done some basic play with the bitcoin dataset > 2. fetching metadata
just want to know what is a good start to solving this issue, should I implement the cache https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs here first? |
that is very interesting
just want to know what is a good start to solving this issue, should I implement the cache https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs here first? If indeed most of the exection time is spent parsing (or fetching) parquet metadata, implementing a basic cache would likely help. Also, @tustvold brought https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/parquet/trait.ParquetFileReaderFactory.html to my attention which might be able to help avoid the overhead So what I suggest is:
The reason to do the POC first is that performance analysis is notoriously tricky at the system lavel so you want to have evidence your work will actually improve performance before you spend a bunch of time polishing up the PR (it is very demotivating, at least to me, to make a beautiful PR only to find out it doesn't really help performance) |
Got it |
@Lordworms if i recall correctly, the s3 list call is made on every query, and if the number of files is large this can be non-trivial, so if the listed files can be cached after creating / the first query then we could save that listing time on subsequent queries. |
Yeah, I got it, I am implementing it, but I am confused about the caching granularity. whether to cache the whole parquet file or a portion of the file. I may misunderstood in the beginning since I thought we should change the performance of Create statement. |
I think both could be done (we actually do that) - but starting with the metadata to me would be a good place to start as to my understanding the builtin cache would be better suited for something like object metadata than the whole parquet files. |
Sure, I got it. Thanks for the advice |
One additional comment, I'm not sure how the existing cache would perform for range queries. For example we a use trie in our implementation. Depending on your findings that could be a follow on though as there might still be enough benefit from a baseline implementation. |
Yes, I think the basic implementation could not handle the both directory and file level. I may implement a trie or other BST structure |
I have implemented a basic LRU metadata cache, and I think just caching the metadata would get slight performance improvement(we call the List_Object API just once but call the Get_Object API more than x times which x is the number of files), most of the time consumed should also be calling get_object API since we should call it for each object. So I think caching the parquet file should be a better way since most of the time consumed is calling the get_range function(which calls the GetObject API and then reads the range of the parquet file)
I have been confused for the last two days and really appreciate your help @alamb @matthewmturner |
@Lordworms thanks for the work on this. Just to confirm - what was the improvement in milliseconds we saw from the object meta cache? For context, in low latency use cases (we have that requirement) even shaving off a few ms can be important so would still want to get that improvement in. IMO it would be good to close the object meta cache topic first before moving on to file cache - unless there is some connection there that I am missing where we need to consider them together. |
Sure, from left to right: unchanged, SequenceTrie cached, ThreadSafeLRU DashMap cached |
Thats interesting that dashmap performed better. Would you mind also doing a query with a filter / pruning involved and comparing the results (perhaps with a range scan as well)? And just to confirm, the cache is filled at creation time during schema inference, right? Also is the cache used by both I think it would also be nice to have a new benchmark for this to help monitor moving forward. |
currently used by ListingTable
Yes, could you give me some review advice when you are available to see if I implemented it right? The closed PR is SequenceTrie and the draft PR is DashMap, Really appreciate your help.
I want to do it, too, but I could not find a suitable open-source dataset for it. Currently, I am using bitcoin. |
@Lordworms apologies it took me longer than expected to get some free time. I plan to review between tonight and tomorrow. |
@Lordworms did you get the chance to compare querying with a filter / pruning involved (ideally with a range) between dashmap and sequence trie? Not sure if the dataset is conducive to that though. |
I haven't added filter test cases yet, but I can do that in the following two days, I'll give you an update then. |
I was playing around today and wanted to post an example that might help people I found a parquet formatted wikipedia dataset: https://huggingface.co/datasets/wikimedia/wikipedia The english subset is here: https://huggingface.co/datasets/wikimedia/wikipedia/tree/main/20231101.en Here is how to create an external table that reads the first file: > create external table wikipedia stored as parquet location 'https://huggingface.co/datasets/wikimedia/wikipedia/resolve/main/20231101.en/train-00000-of-00041.parquet';
> describe wikipedia;
+-------------+-----------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------+-------------+
| id | Utf8 | YES |
| url | Utf8 | YES |
| title | Utf8 | YES |
| text | Utf8 | YES |
+-------------+-----------+-------------+
4 row(s) fetched.
Elapsed 0.001 seconds.
> select id, url, title from wikipedia limit 10;
+-----+----------------------------------------------------------------------------------+------------------------------------------+
| id | url | title |
+-----+----------------------------------------------------------------------------------+------------------------------------------+
| 12 | https://en.wikipedia.org/wiki/Anarchism | Anarchism |
| 39 | https://en.wikipedia.org/wiki/Albedo | Albedo |
| 290 | https://en.wikipedia.org/wiki/A | A |
| 303 | https://en.wikipedia.org/wiki/Alabama | Alabama |
| 305 | https://en.wikipedia.org/wiki/Achilles | Achilles |
| 307 | https://en.wikipedia.org/wiki/Abraham%20Lincoln | Abraham Lincoln |
| 308 | https://en.wikipedia.org/wiki/Aristotle | Aristotle |
| 309 | https://en.wikipedia.org/wiki/An%20American%20in%20Paris | An American in Paris |
| 316 | https://en.wikipedia.org/wiki/Academy%20Award%20for%20Best%20Production%20Design | Academy Award for Best Production Design |
| 324 | https://en.wikipedia.org/wiki/Academy%20Awards | Academy Awards |
+-----+----------------------------------------------------------------------------------+------------------------------------------+
10 row(s) fetched.
Elapsed 1.143 seconds.
> select avg(length(title)) from wikipedia;
+----------------------------------------+
| AVG(character_length(wikipedia.title)) |
+----------------------------------------+
| 18.739130712974042 |
+----------------------------------------+
1 row(s) fetched.
Elapsed 20.559 seconds.
|
Sorry for the late update, I was traveling last week, I'll test it today and give you feedback |
Is your feature request related to a problem or challenge?
The
ListingTable
works quite well in practice, but like all software could be made better. I am writing up this ticket to enumerate some areas for improvement in the hopes people who are interested can collaborate / coordinate their effortsBackground
DataFusion has a
ListingTable
that effectively reading tables stored in one or more files in a "hive partitioned" directory structure:So for example, give files like this:
You can create a table with a command like
CREATE EXTERNAL TABLE my_table LOCATION '/path/to/my_table'
And the
ListingTable
will handle figuring out schema, and running queries against those files as though they were a single table.Describe the solution you'd like
Here are some things I suspect could be improved:
All Formats
Object store list caching
For large tables (many files) on remote stores, the actual object store call to
LIST
may be non trivially expensive and thus doing over and over is expensive@henrifroese points out a similar thing for pruning partitions #9654
Parquet Specific
MetaData caching
ListingTable
(code link) prunes files based on statistics, and then inside the ParquetExec itself (link) where it again prunes row groups and data pages based on metadata. Fetching and parsing this metatadata twice (once to prune files and once to prune row groups) could be improvedIO granularity
I have heard it said that the DataFusion
ParquetExec
reader reads a page at a time -- this is fine if the parquet file is a local file on disk, but it is likely quite inefficient if each page must be fetched with an individual remote object store request. This assertion needs to be researched, but if true we could make queries on remote parquet files much faster by making fewer larger requestsDescribe alternatives you've considered
@Ted-Jiang added some APIs in #7570 https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs but there aren't any default implementations in DataFusion so the metadata is read multiple times
Maybe we can add a default implementation of the caches in SessionContext with a simple policy (like LRU / some max size)
Another potential way to improve performance is to cache the decoded metadata from the Parquet footer rather than checking it once to prune files and then again to prune row groups / pages. This could be taken even farther with pruning files and row groups and pages in one go using and API like #9929
Additional context
@matthewmturner mentioned interest in improving listing table performance: #9899 (comment)
Note we don't use ListingTable in InfluxDB for some of the reasons described above
Related tickets:
ParquetExec
(enable custom indexes, finer grained pushdown) #9929The text was updated successfully, but these errors were encountered: