Skip to content

Commit

Permalink
Remove argparse from get_client function signature (#12)
Browse files Browse the repository at this point in the history
* Remove argparse from get_client function signature

Signed-off-by: Ryan Wolf <[email protected]>

* Update scripts and docs

Signed-off-by: Ryan Wolf <[email protected]>

* Whitespace

Signed-off-by: Ryan Wolf <[email protected]>

* Fix style

Signed-off-by: Ryan Wolf <[email protected]>

* Fix peft tutorial client

Signed-off-by: Ryan Wolf <[email protected]>

* Revert read dtype change

Signed-off-by: Ryan Wolf <[email protected]>

* Refactor exact duplicates

Signed-off-by: Ryan Wolf <[email protected]>

* Add root import for get_client

Signed-off-by: Ryan Wolf <[email protected]>

* Update get_client in distributed data classifiers

Signed-off-by: Ryan Wolf <[email protected]>

* Fix naming of args

Signed-off-by: Ryan Wolf <[email protected]>

---------

Signed-off-by: Ryan Wolf <[email protected]>
  • Loading branch information
ryantwolf authored May 22, 2024
1 parent 9f8578b commit 5e46cd8
Show file tree
Hide file tree
Showing 44 changed files with 210 additions and 147 deletions.
29 changes: 12 additions & 17 deletions docs/user-guide/CPUvsGPU.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

.. _data-curator-cpuvsgpu:

======================================
CPU and GPU Modules with Dask
======================================
Expand All @@ -16,35 +19,27 @@ All of the ``examples/`` use it to set up a Dask cluster.

.. code-block:: python
import argparse
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
def main(args):
# Set up Dask client
client = get_client(args, args.device)
# Set up Dask client
client = get_client(cluster_type="cpu")
# Perform some computation...
# Perform some computation...
def attach_args(parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)):
return add_distributed_args(parser)
``get_client`` takes a bunch of arguments that allow you to initialize or connect to an existing Dask cluster.

if __name__ == "__main__":
main(attach_args().parse_args())
In this short example, you can see that ``get_client`` takes an argparse object with options configured in the corresponding ``add_distributed_args``.

* ``--device`` controls what type of Dask cluster to create. "cpu" will create a CPU based local Dask cluster, while "gpu" will create a GPU based local cluster.
If "cpu" is specified, the number of processes started with the cluster can be specified with the ``--n-workers`` argument.
* ``cluster_type`` controls what type of Dask cluster to create. "cpu" will create a CPU based local Dask cluster, while "gpu" will create a GPU based local cluster.
If "cpu" is specified, the number of processes started with the cluster can be specified with the ``n_workers`` argument.
By default, this argument is set to ``os.cpu_count()``.
If "gpu" is specified, one worker is started per GPU.
It is possible to run entirely CPU-based workflows on a GPU cluster, though the process count (and therefore the number of parallel tasks) will be limited by the number of GPUs on your machine.

* ``--scheduler-address`` and ``--scheduler-file`` are used for connecting to an existing Dask cluster.
* ``scheduler_address`` and ``scheduler_file`` are used for connecting to an existing Dask cluster.
Supplying one of these is essential if you are running a Dask cluster on SLURM or Kubernetes.
All other arguments are ignored if either of these are passed, as the cluster configuration will be done when you create the schduler and works on your cluster.

* The remaining arguments can be modified `here <https://github.com/NVIDIA/NeMo-Curator/blob/main/nemo_curator/utils/distributed_utils.py>`_.

-----------------------------------------
CPU Modules
-----------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions docs/user-guide/DocumentDataset.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

.. _data-curator-documentdataset:

======================================
Working with DocumentDataset
======================================
Expand Down
3 changes: 2 additions & 1 deletion docs/user-guide/KubernetesCurator.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.. _curator_kubernetes:

.. _data-curator-kubernetes:

======================================
Running NeMo Curator on Kubernetes
Expand Down
11 changes: 4 additions & 7 deletions docs/user-guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,22 @@
:ref:`CPU and GPU Modules with Dask <data-curator-cpuvsgpu>`
NeMo Curator provides both CPU based modules and GPU based modules and supports methods for creating compatible Dask clusters and managing the dataset transfer between CPU and GPU.

:ref:`Document Filtering <data-curator-documentfiltering>`
This section describes how to use the 30+ filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora.
:ref:`Document Filtering <data-curator-qualityfiltering>`
This section describes how to use the 30+ heuristic and classifier filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora.

:ref:`Language Identification and Unicode Fixing <data-curator-languageidentification>`
Large, unlabeled text corpora often contain a variety of languages. The NeMo Curator provides utilities to identify languages and fix improperly decoded Unicode characters.

:ref:`GPU Accelerated Exact and Fuzzy Deduplication <data-curator-gpu-deduplication>`
Both exact and fuzzy deduplication functionalities are supported in NeMo Curator and accelerated using RAPIDS cuDF.

:ref:`Classifier and Heuristic Quality Filtering <data-curator-qualityfiltering>`
Classifier-based filtering involves training a small text classifer to label a document as either high quality or low quality. Heuristic-based filtering uses simple rules (e.g. Are there too many punctuation marks?) to score a document. NeMo Curator offers both classifier and heuristic-based quality filtering of documents.

:ref:`Downstream Task Decontamination/Deduplication <data-curator-downstream>`
:ref:`Downstream Task Decontamination <data-curator-downstream>`
After training, large language models are usually evaluated by their performance on downstream tasks consisting of unseen test data. When dealing with large datasets, there is a potential for leakage of this test data into the model’s training dataset. NeMo Curator allows you to remove sections of documents in your dataset that are present in downstream tasks.

:ref:`Personally Identifiable Information Identification and Removal <data-curator-pii>`
The purpose of the personally identifiable information (PII) redaction tool is to help scrub sensitive data out of training datasets

:ref:`curator-kubernetes`
:ref:`NeMo Curator on Kubernetes <data-curator-kubernetes>`
Demonstration of how to run the NeMo Curator on a Dask Cluster deployed on top of Kubernetes

.. toctree::
Expand Down
4 changes: 2 additions & 2 deletions examples/blend_and_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -28,7 +28,7 @@ def main(args):
output_path = "/path/to/output"

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Blend the datasets
datasets = [DocumentDataset.read_json(path) for path in dataset_paths]
Expand Down
4 changes: 2 additions & 2 deletions examples/classifier_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from nemo_curator.modifiers import FastTextLabelModifier
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand Down Expand Up @@ -55,7 +55,7 @@ def main(args):
filtered_output = "/path/to/output"

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
low_quality_samples = create_samples(
low_quality_data_path, "__label__lq", num_low_quality_samples
)
Expand Down
3 changes: 2 additions & 1 deletion examples/domain_classifier_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from nemo_curator import DomainClassifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import parse_client_args


def main(args):
Expand Down Expand Up @@ -59,7 +60,7 @@ def main(args):
input_file_path = "/path/to/data"
output_file_path = "./"

client = get_client(args, cluster_type=args.device)
client = get_client(**parse_client_args(args))

input_dataset = DocumentDataset.read_json(
input_file_path, backend="cudf", add_filename=True
Expand Down
4 changes: 2 additions & 2 deletions examples/download_arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_arxiv
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -27,7 +27,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
arxiv = download_arxiv(output_directory, url_limit=url_limit)
Expand Down
4 changes: 2 additions & 2 deletions examples/download_common_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_common_crawl
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -29,7 +29,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
common_crawl = download_common_crawl(
Expand Down
4 changes: 2 additions & 2 deletions examples/download_wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_wikipedia
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -28,7 +28,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
wikipedia = download_wikipedia(
Expand Down
4 changes: 2 additions & 2 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def pre_imports():
Expand All @@ -33,7 +33,7 @@ def main(args):
output_dir = "./"
dataset_id_field = "id"
dataset_text_field = "text"
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
backend = "cudf" if args.device == "gpu" else "pandas"

if args.device == "gpu":
Expand Down
4 changes: 2 additions & 2 deletions examples/find_pii_and_deidentify.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.modules.modify import Modify
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def console_script():
parser = argparse.ArgumentParser()
arguments = add_distributed_args(parser).parse_args()
_ = get_client(arguments, arguments.device)
_ = get_client(**parse_client_args(arguments))

dataframe = pd.DataFrame(
{"text": ["Sarah and Ryan went out to play", "Jensen is the CEO of NVIDIA"]}
Expand Down
4 changes: 2 additions & 2 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def pre_imports():
Expand All @@ -44,7 +44,7 @@ def main(args):
assert args.device == "gpu"

with dask.config.set({"dataframe.backend": backend}):
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
client.run(pre_imports)

t0 = time.time()
Expand Down
4 changes: 2 additions & 2 deletions examples/identify_languages_and_fix_unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
get_all_files_paths_under,
separate_by_metadata,
)
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand All @@ -49,7 +49,7 @@ def main(args):
language_field = "language"

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Filter data
multilingual_dataset = load_dataset(multilingual_data_path)
Expand Down
3 changes: 2 additions & 1 deletion examples/quality_classifier_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from nemo_curator import QualityClassifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import parse_client_args


def main(args):
Expand All @@ -31,7 +32,7 @@ def main(args):
input_file_path = "/path/to/data"
output_file_path = "./"

client = get_client(args, cluster_type=args.device)
client = get_client(**parse_client_args(args))

input_dataset = DocumentDataset.read_json(
input_file_path, backend="cudf", add_filename=True
Expand Down
4 changes: 2 additions & 2 deletions examples/raw_download_common_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.download_utils import get_common_crawl_urls
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -31,7 +31,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download the raw compressed WARC files
# Unlike the download_common_crawl function, this does not extract the files
Expand Down
4 changes: 2 additions & 2 deletions examples/task_decontamination.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
)
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand Down Expand Up @@ -80,7 +80,7 @@ def main(args):
]

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Filter data
target_dataset = load_dataset(contaminated_dataset_path)
Expand Down
1 change: 1 addition & 0 deletions nemo_curator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import dask

from .modules import *
from .utils.distributed_utils import get_client

# Dask will automatically convert the list score type
# to a string without this option.
Expand Down
3 changes: 2 additions & 1 deletion nemo_curator/sample_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import parse_client_args


def sample_dataframe(df, num_samples):
Expand Down Expand Up @@ -57,7 +58,7 @@ def sample_dataframe(df, num_samples):
)
args = parser.parse_args()
print(f"Arguments parsed = {args}", flush=True)
client = get_client(args, cluster_type="gpu")
client = get_client(**parse_client_args(args), cluster_type="gpu")

print("Starting sampling workflow", flush=True)
st = time.time()
Expand Down
8 changes: 6 additions & 2 deletions nemo_curator/scripts/add_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
expand_outdir_and_mkdir,
get_all_files_paths_under,
)
from nemo_curator.utils.script_utils import add_distributed_args, attach_bool_arg
from nemo_curator.utils.script_utils import (
add_distributed_args,
attach_bool_arg,
parse_client_args,
)


def main(args):
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

output_dir = expand_outdir_and_mkdir(args.output_data_dir)
files = get_all_files_paths_under(args.input_data_dir)
Expand Down
8 changes: 6 additions & 2 deletions nemo_curator/scripts/blend_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
expand_outdir_and_mkdir,
get_all_files_paths_under,
)
from nemo_curator.utils.script_utils import add_distributed_args, attach_bool_arg
from nemo_curator.utils.script_utils import (
add_distributed_args,
attach_bool_arg,
parse_client_args,
)


def main(args):
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

out_dir = expand_outdir_and_mkdir(args.output_data_dir)

Expand Down
Loading

0 comments on commit 5e46cd8

Please sign in to comment.