Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dask_cudf.read_csv #17612

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,21 +714,35 @@ def read_csv(
storage_options=None,
**kwargs,
):
import dask_expr as dx
from fsspec.utils import stringify_path
try:
# TODO: Remove when cudf is pinned to dask>2024.12.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense for us to do this? rapids-dask-dependency should always pick the latest Dask, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "when cudf is pinned", I mean "when rapids-dask-dependency is pinned". Is that your question?

rapids-dask-dependency currently pins to >=2024.11.2. This means another package with a dask<2024.12.0 requirement can still give us dask-2024.11.2 in practice, no?

import dask_expr as dx
from dask_expr.io.csv import ReadCSV
from fsspec.utils import stringify_path

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
ReadCSV(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend="cudf",
)
)
except ImportError:
# Requires dask>2024.12.0
from dask_cudf.io.csv import read_csv

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
dx.io.csv.ReadCSV(
return read_csv(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
*args,
header=header,
dataframe_backend="cudf",
storage_options=storage_options,
**kwargs,
)
)

@staticmethod
def read_json(*args, **kwargs):
Expand Down
195 changes: 190 additions & 5 deletions python/dask_cudf/dask_cudf/io/csv.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: This code was translated from the "legacy" code in dask_cudf/_legacy/io/csv.py. After this PR is merged, that file can be removed in #17558

Original file line number Diff line number Diff line change
@@ -1,8 +1,193 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from dask_cudf import _deprecated_api
import os
from glob import glob
from warnings import warn

read_csv = _deprecated_api(
"dask_cudf.io.csv.read_csv",
new_api="dask_cudf.read_csv",
)
from fsspec.utils import infer_compression

from dask import dataframe as dd
from dask.dataframe.io.csv import make_reader
from dask.utils import parse_bytes

import cudf


def read_csv(path, blocksize="default", **kwargs):
"""
Read CSV files into a :class:`.DataFrame`.

This API parallelizes the :func:`cudf:cudf.read_csv` function in
the following ways:

It supports loading many files at once using globstrings:

>>> import dask_cudf
>>> df = dask_cudf.read_csv("myfiles.*.csv")

In some cases it can break up large files:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What cases are those? Or is it always dependent upon the file size and the value of blocksize? If the latter, maybe we should just rephrase to "It can also break up large files by specifying the size of each block via blocksize".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't spend any time reviewing these doc-strings, because they were directly copied from dask_cudf/_legacy/io/csv.py. Does it make sense to address these suggestions/questions in a follow-up (just to make sure CI is unblocked)?


>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB")

It can read CSV files from external resources (e.g. S3, HTTP, FTP)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
It can read CSV files from external resources (e.g. S3, HTTP, FTP)
It can read CSV files from external resources (e.g. S3, HTTP, FTP):


>>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv")
>>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv")

Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and
supports many of the same keyword arguments with the same
performance guarantees. See the docstring for
:func:`cudf:cudf.read_csv` for more information on available
keyword arguments.

Parameters
----------
path : str, path object, or file-like object
Either a path to a file (a str, :py:class:`pathlib.Path`, or
py._path.local.LocalPath), URL (including http, ftp, and S3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
py._path.local.LocalPath), URL (including http, ftp, and S3
``py._path.local.LocalPath``), URL (including HTTP, FTP, and S3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could also just be :py:class:py._path.local.LocalPath?

locations), or any object with a read() method (such as
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
locations), or any object with a read() method (such as
locations), or any object with a ``read()`` method (such as

builtin :py:func:`open` file handler function or
:py:class:`~io.StringIO`).
blocksize : int or str, default "256 MiB"
The target task partition size. If ``None``, a single block
is used for each file.
**kwargs : dict
Passthrough key-word arguments that are sent to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Passthrough key-word arguments that are sent to
Passthrough keyword arguments that are sent to

:func:`cudf:cudf.read_csv`.

Notes
-----
If any of `skipfooter`/`skiprows`/`nrows` are passed,
`blocksize` will default to None.

Examples
--------
>>> import dask_cudf
>>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"])
>>> ddf.compute()
a b
0 1 hi
1 2 hello
2 3 ai

"""
# Set default `blocksize`
if blocksize == "default":
if (
kwargs.get("skipfooter", 0) != 0
or kwargs.get("skiprows", 0) != 0
or kwargs.get("nrows", None) is not None
):
# Cannot read in blocks if skipfooter,
# skiprows or nrows is passed.
blocksize = None
else:
blocksize = "256 MiB"

if "://" in str(path):
func = make_reader(cudf.read_csv, "read_csv", "CSV")
return func(path, blocksize=blocksize, **kwargs)
else:
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs)


def _internal_read_csv(path, blocksize="256 MiB", **kwargs):
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)

if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

if not filenames:
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

compression = kwargs.get("compression", "infer")

if compression == "infer":
# Infer compression from first path by default
compression = infer_compression(filenames[0])

if compression and blocksize:
# compressed CSVs reading must read the entire file
kwargs.pop("byte_range", None)
warn(
"Warning %s compression does not support breaking apart files\n"
"Please ensure that each individual file can fit in memory and\n"
"use the keyword ``blocksize=None to remove this message``\n"
"Setting ``blocksize=(size of file)``" % compression
)
blocksize = None

if blocksize is None:
return read_csv_without_blocksize(path, **kwargs)

# Let dask.dataframe generate meta
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV")
kwargs1 = kwargs.copy()
usecols = kwargs1.pop("usecols", None)
dtype = kwargs1.pop("dtype", None)
meta = dask_reader(filenames[0], **kwargs1)._meta
names = meta.columns
if usecols or dtype:
# Regenerate meta with original kwargs if
# `usecols` or `dtype` was specified
meta = dask_reader(filenames[0], **kwargs)._meta

i = 0
path_list = []
kwargs_list = []
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, blocksize):
kwargs2 = kwargs.copy()
kwargs2["byte_range"] = (
start,
blocksize,
) # specify which chunk of the file we care about
if start != 0:
kwargs2["names"] = names # no header in the middle of the file
kwargs2["header"] = None
path_list.append(fn)
kwargs_list.append(kwargs2)
i += 1

return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta)


def _read_csv(fn, kwargs):
return cudf.read_csv(fn, **kwargs)


def read_csv_without_blocksize(path, **kwargs):
"""Read entire CSV with optional compression (gzip/zip)

Parameters
----------
path : str
path to files (support for glob)
"""
if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

meta_kwargs = kwargs.copy()
if "skipfooter" in meta_kwargs:
meta_kwargs.pop("skipfooter")
if "nrows" in meta_kwargs:
meta_kwargs.pop("nrows")
# Read "head" of first file (first 5 rows).
# Convert to empty df for metadata.
meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0]
return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs)
9 changes: 0 additions & 9 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size):
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ)
dd.assert_eq(df, df2)

# Test chunksize deprecation
with pytest.warns(FutureWarning, match="deprecated"):
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
dd.assert_eq(df, df3)
Comment on lines -188 to -191
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was deprecated a long time ago, the new code path doesn't try to catch this anymore.



@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None])
def test_csv_reader_usecols(tmp_path, dtype):
Expand Down Expand Up @@ -275,7 +270,3 @@ def test_deprecated_api_paths(tmp_path):
with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"):
df2 = dask_cudf.io.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)

with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"):
df2 = dask_cudf.io.csv.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)
Comment on lines -279 to -281
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new csv code now lives at this "expected" path.

Loading