Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental filesystem="arrow" support in dask_cudf.read_parquet #16684

Merged
merged 49 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
469bc5e
allow pyarrow-based read with cudf backend
rjzamora Aug 27, 2024
f20cc25
re-org
rjzamora Aug 27, 2024
8f0f598
temporary change for debugging
rjzamora Aug 28, 2024
64fd701
adjust for upstream bug
rjzamora Aug 28, 2024
8e0c902
remove stale comment
rjzamora Aug 28, 2024
18e1c08
add file aggregation
rjzamora Aug 28, 2024
5215a05
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
c51a7bb
test coverage
rjzamora Aug 29, 2024
b7a90c1
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
43274e2
allow aggregate_files=True
rjzamora Aug 30, 2024
63c3f04
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Aug 30, 2024
a1bd43c
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 30, 2024
e3ca47f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 3, 2024
12c09a5
fix test
rjzamora Sep 3, 2024
daee7ec
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 4, 2024
d068103
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 4, 2024
257eb26
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 5, 2024
ec38b1e
Make isinstance check pass for proxy ndarrays (#16601)
Matt711 Sep 5, 2024
853c76b
Performance improvement for strings::slice for wide strings (#16574)
davidwendt Sep 5, 2024
bdd2bab
skip for pyarrow<15
rjzamora Sep 6, 2024
d943d8d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 6, 2024
eb9eee0
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
b9c5147
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
ec04e78
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 18, 2024
e391789
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 19, 2024
e154d01
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 19, 2024
3246d67
Intentionally leak thread_local CUDA resources to avoid crash (part 1…
kingcrimsontianyu Sep 19, 2024
2f424f2
Access Frame attributes instead of ColumnAccessor attributes when ava…
mroeschke Sep 19, 2024
362195d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 24, 2024
4ce83d4
isolate expermental code path
rjzamora Sep 24, 2024
4d87013
remove unncessary logic
rjzamora Sep 24, 2024
e5b272a
remove unncessary logic - forgot to save
rjzamora Sep 24, 2024
8d87c54
add warning
rjzamora Sep 24, 2024
8cfe71e
remove blocksize and aggregate_files handling
rjzamora Sep 24, 2024
badf359
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 24, 2024
4c1c5ae
warn rather than raise for blocksize
rjzamora Sep 24, 2024
3f1d925
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
galipremsagar Sep 24, 2024
8c267c7
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
91d2d77
address code review from mads
rjzamora Sep 25, 2024
239639f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
c944a52
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
galipremsagar Sep 25, 2024
791a4fd
more cleanup
rjzamora Sep 25, 2024
4c5ee6d
remove warning and add not to best-practices
rjzamora Sep 25, 2024
4d28db7
Build `cudf-polars` with `build.sh` (#16898)
brandon-b-miller Sep 25, 2024
9aa5aca
Fix DataFrame.drop(columns=cudf.Series/Index, axis=1) (#16712)
mroeschke Sep 25, 2024
42a15ee
[DOC] Update Pylibcudf doc strings (#16810)
Matt711 Sep 25, 2024
2c5bb57
Optimization of tdigest merge aggregation. (#16780)
nvdbaranec Sep 25, 2024
ed19b2e
Display deltas for `cudf.pandas` test summary (#16864)
galipremsagar Sep 25, 2024
aa492f5
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 138 additions & 6 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
import pandas as pd
import pyarrow as pa
from packaging.version import Version
from pandas.api.types import is_scalar

import dask.dataframe as dd
Expand Down Expand Up @@ -52,6 +53,10 @@
get_parallel_type.register(cudf.BaseIndex, lambda _: Index)


# Required for Arrow filesystem support in read_parquet
PYARROW_GE_15 = Version(pa.__version__) >= Version("15.0.0")


@meta_nonempty.register(cudf.BaseIndex)
@_dask_cudf_performance_tracking
def _nonempty_index(idx):
Expand Down Expand Up @@ -695,15 +700,142 @@ def from_dict(
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
def read_parquet(path, filesystem="fsspec", engine=None, **kwargs):
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
import dask_expr as dx
import fsspec

from dask_cudf.io.parquet import CudfEngine
if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet, *args, engine=CudfEngine, **kwargs
)
_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

# CudfReadParquetPyarrowFS requires distributed
# (See: https://github.com/dask/dask/issues/11352)
from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.expr._expr import CudfReadParquetPyarrowFS
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

if not PYARROW_GE_15:
raise RuntimeError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

warnings.warn(
f"Support for `filesystem={filesystem}` is experimental. "
"Using PyArrow to perform IO on multiple CPU threads. "
"Behavior may change in the future (without deprecation). "
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Do we really want a user-visible (?) warning here? If curator uses this, it would presumably mean end users would see this warning but not know what to do about it (or have any facility to).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushdg @VibhuJawa - What are your feelings on this. I agree that this is probably a bit annoying :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay - I added a note to the best-practices docs about this feature, and removed the explicit warning.


if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)

@staticmethod
def read_csv(
Expand Down
89 changes: 89 additions & 0 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import functools

import dask_expr._shuffle as _shuffle_module
import pandas as pd
from dask_expr import new_collection
from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._expr import Elemwise, Expr, VarColumns
from dask_expr._reductions import Reduction, Var
from dask_expr.io.io import FusedParquetIO
from dask_expr.io.parquet import ReadParquetPyarrowFS

from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty
from dask.dataframe.dispatch import is_categorical_dtype
Expand All @@ -17,6 +20,92 @@
##


class CudfFusedParquetIO(FusedParquetIO):
@staticmethod
def _load_multiple_files(
frag_filters,
columns,
schema,
*to_pandas_args,
):
import pyarrow as pa

from dask.base import apply, tokenize
from dask.threaded import get

token = tokenize(frag_filters, columns, schema)
name = f"pq-file-{token}"
dsk = {
(name, i): (
CudfReadParquetPyarrowFS._fragment_to_table,
frag,
filter,
columns,
schema,
)
for i, (frag, filter) in enumerate(frag_filters)
}
dsk[name] = (
apply,
pa.concat_tables,
[list(dsk.keys())],
{"promote_options": "permissive"},
)
return CudfReadParquetPyarrowFS._table_to_pandas(
get(dsk, name),
*to_pandas_args,
)


class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS):
@functools.cached_property
def _dataset_info(self):
from dask_cudf.io.parquet import set_object_dtypes_from_pa_schema

dataset_info = super()._dataset_info
meta_pd = dataset_info["base_meta"]
if isinstance(meta_pd, cudf.DataFrame):
return dataset_info

# Convert to cudf
# (drop unsupported timezone information)
for k, v in meta_pd.dtypes.items():
if isinstance(v, pd.DatetimeTZDtype) and v.tz is not None:
meta_pd[k] = meta_pd[k].dt.tz_localize(None)
meta_cudf = cudf.from_pandas(meta_pd)

# Re-set "object" dtypes to align with pa schema
kwargs = dataset_info.get("kwargs", {})
set_object_dtypes_from_pa_schema(
meta_cudf,
kwargs.get("schema", None),
)

dataset_info["base_meta"] = meta_cudf
self.operands[type(self)._parameters.index("_dataset_info_cache")] = (
dataset_info
)
return dataset_info

@staticmethod
def _table_to_pandas(
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
table,
index_name,
*args,
):
df = cudf.DataFrame.from_arrow(table)
if index_name is not None:
df = df.set_index(index_name)
return df

def _tune_up(self, parent):
if self._fusion_compression_factor >= 1:
return
if isinstance(parent, CudfFusedParquetIO):
return
return parent.substitute(self, CudfFusedParquetIO(self))


class ToCudfBackend(Elemwise):
# TODO: Inherit from ToBackend when rapids-dask-dependency
# is pinned to dask>=2024.8.1
Expand Down
46 changes: 33 additions & 13 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dask.dataframe import assert_eq

import dask_cudf
from dask_cudf.tests.utils import QUERY_PLANNING_ON

moto = pytest.importorskip("moto", minversion="3.1.6")
boto3 = pytest.importorskip("boto3")
Expand Down Expand Up @@ -127,29 +128,48 @@ def test_read_parquet_open_file_options_raises():
)


def test_read_parquet_filesystem(s3_base, s3so, pdf):
@pytest.mark.parametrize(
"filesystem",
[
pytest.param(
"arrow",
marks=pytest.mark.skipif(
not QUERY_PLANNING_ON or not dask_cudf.backends.PYARROW_GE_15,
reason="Not supported",
),
),
"fsspec",
],
)
def test_read_parquet_filesystem(s3_base, s3so, pdf, filesystem):
fname = "test_parquet_filesystem.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
path = f"s3://{bucket}/{fname}"

# Cannot pass filesystem="arrow"
with pytest.raises(ValueError):
dask_cudf.read_parquet(
if filesystem == "arrow":
# This feature requires arrow >= 15
pytest.importorskip("pyarrow", minversion="15.0.0")

import pyarrow.fs as pa_fs

with pytest.warns(match="experimental"):
df = dask_cudf.read_parquet(
path,
filesystem=pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"][
"endpoint_url"
],
),
)
else:
df = dask_cudf.read_parquet(
path,
storage_options=s3so,
filesystem="arrow",
filesystem=filesystem,
)

# Can pass filesystem="fsspec"
df = dask_cudf.read_parquet(
path,
storage_options=s3so,
filesystem="fsspec",
)
assert df.b.sum().compute() == 9


Expand Down
Loading