Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental filesystem="arrow" support in dask_cudf.read_parquet #16684

Merged
merged 49 commits into from
Sep 25, 2024

Conversation

rjzamora
Copy link
Member

Description

This PR piggybacks on the existing CPU/Arrow Parquet infrastructure in dask-expr. With this PR,

df = dask_cudf.read_parquet(path, filesystem="arrow")

will produce a cudf-backed collection using PyArrow for IO (i.e. disk->pa.Table->cudf.DataFrame). Before this PR, passing filesystem="arrow" will simply result in an error.

Although this code path is not ideal for fast/local storage, it can be very efficient for remote storage (e.g. S3).

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora added feature request New feature or request 2 - In Progress Currently a work in progress dask Dask issue non-breaking Non-breaking change labels Aug 28, 2024
@rjzamora rjzamora self-assigned this Aug 28, 2024
@github-actions github-actions bot added the Python Affects Python cuDF API. label Aug 28, 2024
rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Aug 30, 2024
Adds new benchmark for parquet read performance using a `LocalCUDACluster`. The user can pass in `--key` and `--secret` options to specify S3 credentials.

E.g.
```
$ python ./local_read_parquet.py --devs 0,1,2,3,4,5,6,7 --filesystem fsspec --type gpu --file-count 48 --aggregate-files

Parquet read benchmark
--------------------------------------------------------------------------------
Path                      | s3://dask-cudf-parquet-testing/dedup_parquet
Columns                   | None
Backend                   | cudf
Filesystem                | fsspec
Blocksize                 | 244.14 MiB
Aggregate files           | True
Row count                 | 372066
Size on disk              | 1.03 GiB
Number of workers         | 8
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
36.75 s                   | 28.78 MiB/s
21.29 s                   | 49.67 MiB/s
17.91 s                   | 59.05 MiB/s
================================================================================
Throughput                | 41.77 MiB/s +/- 7.81 MiB/s
Bandwidth                 | 0 B/s +/- 0 B/s
Wall clock                | 25.32 s +/- 8.20 s
================================================================================
...
```

**Notes**:
- S3 Performance generally scales with the number of workers (multiplied the number of threads per worker)
- The example shown above was not executed from an EC2 instance
- The example shown above *should* perform better after rapidsai/cudf#16657
- Using `--filesystem arrow` together with `--type gpu` performs well, but depends on rapidsai/cudf#16684

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #1371
@GregoryKimball
Copy link
Contributor

@rjzamora , I just heard from @ayushdg that NeMo Curator would benefit from this feature in 24.10. Would you please let me know what steps are needed to complete this work? Who should the reviewers be?

@rjzamora
Copy link
Member Author

Would you please let me know what steps are needed to complete this work? Who should the reviewers be?

Thanks for the nudge @GregoryKimball - I spent some time today isolating the "experimental" logic used for this feature. Therefore, the PR should now be relatively "low risk". I'd like to grab an ec2 instance later to day to test that the performance is still good.

The usual dask-cudf and python/IO reviewers include @pentschev @charlesbluca @galipremsagar @wence- @madsbk @quasiben @vyasr - I'd I welcome a review from anyone available :)

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora, I only have some minor suggestions

python/dask_cudf/dask_cudf/backends.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/backends.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/backends.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/backends.py Outdated Show resolved Hide resolved
Copy link
Contributor

@galipremsagar galipremsagar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora !

@galipremsagar galipremsagar added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team labels Sep 25, 2024
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question about how noisy we want the "experimental" warning to be

Comment on lines 752 to 756
warnings.warn(
f"Support for `filesystem={filesystem}` is experimental. "
"Using PyArrow to perform IO on multiple CPU threads. "
"Behavior may change in the future (without deprecation). "
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Do we really want a user-visible (?) warning here? If curator uses this, it would presumably mean end users would see this warning but not know what to do about it (or have any facility to).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushdg @VibhuJawa - What are your feelings on this. I agree that this is probably a bit annoying :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay - I added a note to the best-practices docs about this feature, and removed the explicit warning.

python/dask_cudf/dask_cudf/expr/_expr.py Show resolved Hide resolved
rjzamora and others added 7 commits September 25, 2024 12:36
This PR adds `cudf-polars` to the top level build script.

Authors:
  - https://github.com/brandon-b-miller
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Jake Awe (https://github.com/AyodeAwe)

URL: rapidsai#16898
Before when `columns=` was a `cudf.Series/Index` we would call `return array.unique.to_pandas()`, but `.unique` is a method not a property so this would have raised an error.

Also took the time to refactor the helper methods here and push down the `errors=` keyword to `Frame._drop_column`

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: rapidsai#16712
This PR is a first pass at rapidsai#15937. We will close rapidsai#15937 after rapidsai#15162 is closed

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: rapidsai#16810
Fixes rapidsai#16625

This PR fixes a slow implementation of the centroid merging step during the tdigest merge aggregation.  Previously it was doing a linear march over the individual tdigests per group and merging them one by one.  This led to terrible performance for large numbers of groups.  In principle though, all this really was doing was a segmented sort of centroid values. So that's what this PR changes it to.  Speedup for 1,000,000 input tidests with 1,000,000 individual groups is ~1000x,

```
Old
---------------------------------------------------------------------------------------------------------------
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time        7473 ms         7472 ms            8
TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time        7433 ms         7431 ms            8
```


```
New
---------------------------------------------------------------------------------------------------------------
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time        6.72 ms         6.79 ms            8
TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time        1.24 ms         1.32 ms            8
```

Authors:
  - https://github.com/nvdbaranec
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Nghia Truong (https://github.com/ttnghia)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: rapidsai#16780
This PR displays delta's for CPU and GPU usage metrics that are extracted from `cudf.pandas` pytests.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Jake Awe (https://github.com/AyodeAwe)

URL: rapidsai#16864
@galipremsagar
Copy link
Contributor

/merge

@rapids-bot rapids-bot bot merged commit 0425963 into rapidsai:branch-24.10 Sep 25, 2024
96 checks passed
@rjzamora rjzamora deleted the dask-cudf-arrow-filesystem branch September 26, 2024 01:23
rapids-bot bot pushed a commit that referenced this pull request Oct 23, 2024
Follow-up to #16684

There is currently a bug in `dask_cudf.read_parquet(..., filesystem="arrow")` when the files are larger than the `"dataframe.parquet.minimum-partition-size"` config. More specifically, when the files are not aggregated together, the output will be `pd.DataFrame` instead of `cudf.DataFrame`.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #17099
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge dask Dask issue feature request New feature or request non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.