Skip to content

Commit

Permalink
Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* (
Browse files Browse the repository at this point in the history
#285)

* fc

Signed-off-by: Praateek <[email protected]>

* review comments

Signed-off-by: Praateek <[email protected]>

* make blocksize work with parquet

Signed-off-by: Praateek <[email protected]>

* filetype

Signed-off-by: Praateek <[email protected]>

* fix merge

Signed-off-by: Praateek <[email protected]>

* add test cases

Signed-off-by: Praateek <[email protected]>

* add test file

Signed-off-by: Praateek <[email protected]>

* failing test for select_columns

Signed-off-by: Praateek <[email protected]>

* rename func name

Signed-off-by: Praateek <[email protected]>

* add test case for different columns

Signed-off-by: Praateek <[email protected]>

* improve test for different_cols

Signed-off-by: Praateek <[email protected]>

* ..

Signed-off-by: Praateek <[email protected]>

* review comments + add warnings for inconsistent schemas

Signed-off-by: Praateek <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* Update nemo_curator/utils/distributed_utils.py

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>

* fix tests

Signed-off-by: Praateek <[email protected]>

---------

Signed-off-by: Praateek <[email protected]>
Signed-off-by: Praateek Mahajan <[email protected]>
Co-authored-by: Sarah Yurick <[email protected]>
  • Loading branch information
praateekmahajan and sarahyurick authored Dec 17, 2024
1 parent c54826a commit e820b8b
Show file tree
Hide file tree
Showing 7 changed files with 814 additions and 56 deletions.
11 changes: 11 additions & 0 deletions docs/user-guide/bestpractices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ Handling GPU Out-of-Memory (OOM) Errors
NeMo Curator is designed to be scalable with large amounts of text data, but OOM errors occur when the available GPU memory is insufficient for a given task.
To help avoid these issues and ensure efficient processing, here are some strategies for managing memory usage and mitigating OOM challenges.

Controlling Partition Sizes
~~~~~~~~~~~~~~~~~~~~~~~~~~~

The user should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks.

#. The ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. However, for `parquet` files, it is currently only available when ``add_filename=False``.

#. For the ``blocksize`` parameter, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set ``blocksize="1GB"``.


Utilize RMM Options
~~~~~~~~~~~~~~~~~~~
`RAPIDS Memory Manager (RMM) <https://github.com/rapidsai/rmm>`_ is a package that enables you to allocate device memory in a highly configurable way.
Expand Down Expand Up @@ -59,6 +69,7 @@ Alternatively, you can set these flags while initializing your own Dask client,
client = Client(cluster)
Fuzzy Deduplication Guidelines
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Fuzzy deduplication is one of the most computationally expensive algorithms within the NeMo Curator pipeline.
Expand Down
10 changes: 10 additions & 0 deletions nemo_curator/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
# When mocking with autodoc the dask version is not there
_dask_version = parse_version("2024.06.0")


try:
import dask_cudf

_dask_cudf_version = parse_version(dask_cudf.__version__)
except (ImportError, TypeError):
# When mocking with autodoc the dask version is not there
_dask_cudf_version = parse_version("2024.06.0")

try:
import cudf

Expand All @@ -40,6 +49,7 @@
DASK_SHUFFLE_METHOD_ARG = _dask_version > parse_version("2024.1.0")
DASK_P2P_ERROR = _dask_version < parse_version("2023.10.0")
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parse_version("2023.12.0")
DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA = _dask_version > parse_version("2024.12")

# Query-planning check (and cache)
_DASK_QUERY_PLANNING_ENABLED = None
Expand Down
24 changes: 14 additions & 10 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def read_json(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
Expand All @@ -74,8 +75,9 @@ def read_json(
input_files=input_files,
file_type="jsonl",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
input_meta=input_meta,
columns=columns,
**kwargs,
Expand All @@ -87,8 +89,9 @@ def read_parquet(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename=False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -109,8 +112,9 @@ def read_parquet(
input_files=input_files,
file_type="parquet",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
columns=columns,
**kwargs,
)
Expand All @@ -121,8 +125,6 @@ def read_pickle(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -142,8 +144,6 @@ def read_pickle(
input_files=input_files,
file_type="pickle",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
columns=columns,
**kwargs,
)
Expand Down Expand Up @@ -234,8 +234,9 @@ def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: Literal["cudf", "pandas"],
files_per_partition: int,
add_filename: bool,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand Down Expand Up @@ -267,6 +268,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -286,6 +288,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -311,6 +314,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand Down
Loading

0 comments on commit e820b8b

Please sign in to comment.