Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove argparse from get_client function signature #12

Merged
merged 13 commits into from
May 22, 2024
29 changes: 12 additions & 17 deletions docs/user-guide/CPUvsGPU.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

.. _data-curator-cpuvsgpu:

======================================
CPU and GPU Modules with Dask
======================================
Expand All @@ -16,35 +19,27 @@ All of the ``examples/`` use it to set up a Dask cluster.

.. code-block:: python

import argparse
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args


def main(args):
# Set up Dask client
client = get_client(args, args.device)
# Set up Dask client
client = get_client(cluster_type="cpu")

# Perform some computation...
# Perform some computation...

def attach_args(parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)):
return add_distributed_args(parser)
``get_client`` takes a bunch of arguments that allow you to initialize or connect to an existing Dask cluster.

if __name__ == "__main__":
main(attach_args().parse_args())

In this short example, you can see that ``get_client`` takes an argparse object with options configured in the corresponding ``add_distributed_args``.

* ``--device`` controls what type of Dask cluster to create. "cpu" will create a CPU based local Dask cluster, while "gpu" will create a GPU based local cluster.
If "cpu" is specified, the number of processes started with the cluster can be specified with the ``--n-workers`` argument.
* ``cluster_type`` controls what type of Dask cluster to create. "cpu" will create a CPU based local Dask cluster, while "gpu" will create a GPU based local cluster.
If "cpu" is specified, the number of processes started with the cluster can be specified with the ``n_workers`` argument.
By default, this argument is set to ``os.cpu_count()``.
If "gpu" is specified, one worker is started per GPU.
It is possible to run entirely CPU-based workflows on a GPU cluster, though the process count (and therefore the number of parallel tasks) will be limited by the number of GPUs on your machine.

* ``--scheduler-address`` and ``--scheduler-file`` are used for connecting to an existing Dask cluster.
* ``scheduler_address`` and ``scheduler_file`` are used for connecting to an existing Dask cluster.
Supplying one of these is essential if you are running a Dask cluster on SLURM or Kubernetes.
All other arguments are ignored if either of these are passed, as the cluster configuration will be done when you create the schduler and works on your cluster.

* The remaining arguments can be modified `here <https://github.com/NVIDIA/NeMo-Curator/blob/main/nemo_curator/utils/distributed_utils.py>`_.

-----------------------------------------
CPU Modules
-----------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions docs/user-guide/DocumentDataset.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

.. _data-curator-documentdataset:

======================================
Working with DocumentDataset
======================================
Expand Down
3 changes: 2 additions & 1 deletion docs/user-guide/KubernetesCurator.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.. _curator_kubernetes:

.. _data-curator-kubernetes:

======================================
Running NeMo Curator on Kubernetes
Expand Down
11 changes: 4 additions & 7 deletions docs/user-guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,22 @@
:ref:`CPU and GPU Modules with Dask <data-curator-cpuvsgpu>`
NeMo Curator provides both CPU based modules and GPU based modules and supports methods for creating compatible Dask clusters and managing the dataset transfer between CPU and GPU.

:ref:`Document Filtering <data-curator-documentfiltering>`
This section describes how to use the 30+ filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora.
:ref:`Document Filtering <data-curator-qualityfiltering>`
This section describes how to use the 30+ heuristic and classifier filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora.

:ref:`Language Identification and Unicode Fixing <data-curator-languageidentification>`
Large, unlabeled text corpora often contain a variety of languages. The NeMo Curator provides utilities to identify languages and fix improperly decoded Unicode characters.

:ref:`GPU Accelerated Exact and Fuzzy Deduplication <data-curator-gpu-deduplication>`
Both exact and fuzzy deduplication functionalities are supported in NeMo Curator and accelerated using RAPIDS cuDF.

:ref:`Classifier and Heuristic Quality Filtering <data-curator-qualityfiltering>`
Classifier-based filtering involves training a small text classifer to label a document as either high quality or low quality. Heuristic-based filtering uses simple rules (e.g. Are there too many punctuation marks?) to score a document. NeMo Curator offers both classifier and heuristic-based quality filtering of documents.

:ref:`Downstream Task Decontamination/Deduplication <data-curator-downstream>`
:ref:`Downstream Task Decontamination <data-curator-downstream>`
After training, large language models are usually evaluated by their performance on downstream tasks consisting of unseen test data. When dealing with large datasets, there is a potential for leakage of this test data into the model’s training dataset. NeMo Curator allows you to remove sections of documents in your dataset that are present in downstream tasks.

:ref:`Personally Identifiable Information Identification and Removal <data-curator-pii>`
The purpose of the personally identifiable information (PII) redaction tool is to help scrub sensitive data out of training datasets

:ref:`curator-kubernetes`
:ref:`NeMo Curator on Kubernetes <data-curator-kubernetes>`
Demonstration of how to run the NeMo Curator on a Dask Cluster deployed on top of Kubernetes

.. toctree::
Expand Down
4 changes: 2 additions & 2 deletions examples/blend_and_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -28,7 +28,7 @@ def main(args):
output_path = "/path/to/output"

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Blend the datasets
datasets = [DocumentDataset.read_json(path) for path in dataset_paths]
Expand Down
4 changes: 2 additions & 2 deletions examples/classifier_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from nemo_curator.modifiers import FastTextLabelModifier
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand Down Expand Up @@ -55,7 +55,7 @@ def main(args):
filtered_output = "/path/to/output"

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
low_quality_samples = create_samples(
low_quality_data_path, "__label__lq", num_low_quality_samples
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from nemo_curator import DomainClassifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import parse_client_args


def main(args):
Expand Down Expand Up @@ -59,7 +60,7 @@ def main(args):
input_file_path = "/path/to/data"
output_file_path = "./"

client = get_client(args, cluster_type=args.device)
client = get_client(**parse_client_args(args))

input_dataset = DocumentDataset.read_json(
input_file_path, backend="cudf", add_filename=True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from nemo_curator import QualityClassifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import parse_client_args


def main(args):
Expand All @@ -31,7 +32,7 @@ def main(args):
input_file_path = "/path/to/data"
output_file_path = "./"

client = get_client(args, cluster_type=args.device)
client = get_client(**parse_client_args(args))

input_dataset = DocumentDataset.from_json(
input_file_path, backend="cudf", add_filename=True
Expand Down
4 changes: 2 additions & 2 deletions examples/download_arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_arxiv
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -27,7 +27,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
arxiv = download_arxiv(output_directory, url_limit=url_limit)
Expand Down
4 changes: 2 additions & 2 deletions examples/download_common_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_common_crawl
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -29,7 +29,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
common_crawl = download_common_crawl(
Expand Down
4 changes: 2 additions & 2 deletions examples/download_wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from nemo_curator.download import download_wikipedia
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -28,7 +28,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download and sample data
wikipedia = download_wikipedia(
Expand Down
4 changes: 2 additions & 2 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def pre_imports():
Expand All @@ -33,7 +33,7 @@ def main(args):
output_dir = "./"
dataset_id_field = "id"
dataset_text_field = "text"
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
backend = "cudf" if args.device == "gpu" else "pandas"

if args.device == "gpu":
Expand Down
4 changes: 2 additions & 2 deletions examples/find_pii_and_deidentify.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.modules.modify import Modify
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def console_script():
parser = argparse.ArgumentParser()
arguments = add_distributed_args(parser).parse_args()
_ = get_client(arguments, arguments.device)
_ = get_client(**parse_client_args(arguments))

dataframe = pd.DataFrame(
{"text": ["Sarah and Ryan went out to play", "Jensen is the CEO of NVIDIA"]}
Expand Down
4 changes: 2 additions & 2 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def pre_imports():
Expand All @@ -44,7 +44,7 @@ def main(args):
assert args.device == "gpu"

with dask.config.set({"dataframe.backend": backend}):
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))
client.run(pre_imports)

t0 = time.time()
Expand Down
4 changes: 2 additions & 2 deletions examples/identify_languages_and_fix_unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
get_all_files_paths_under,
separate_by_metadata,
)
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand All @@ -49,7 +49,7 @@ def main(args):
language_field = "language"

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Filter data
multilingual_dataset = load_dataset(multilingual_data_path)
Expand Down
4 changes: 2 additions & 2 deletions examples/raw_download_common_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.download_utils import get_common_crawl_urls
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def main(args):
Expand All @@ -31,7 +31,7 @@ def main(args):
url_limit = 10

# Set up Dask client
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Download the raw compressed WARC files
# Unlike the download_common_crawl function, this does not extract the files
Expand Down
4 changes: 2 additions & 2 deletions examples/task_decontamination.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
)
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import add_distributed_args
from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args


def load_dataset(input_data_dir):
Expand Down Expand Up @@ -80,7 +80,7 @@ def main(args):
]

# Prepare samples for the classifier
client = get_client(args, args.device)
client = get_client(**parse_client_args(args))

# Filter data
target_dataset = load_dataset(contaminated_dataset_path)
Expand Down
1 change: 1 addition & 0 deletions nemo_curator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import dask

from .modules import *
from .utils.distributed_utils import get_client

# Dask will automatically convert the list score type
# to a string without this option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
write_to_disk,
)
from nemo_curator.utils.file_utils import get_remaining_files
from nemo_curator.utils.script_utils import parse_client_args

warnings.filterwarnings("ignore")

Expand Down Expand Up @@ -212,7 +213,7 @@ def main():
batch_size = args.batch_size
num_workers = 0

client = get_client(args, cluster_type="gpu")
client = get_client(**parse_client_args(args), cluster_type="gpu")
print("Starting domain classifier inference", flush=True)
global_st = time.time()
files_per_run = len(client.scheduler_info()["workers"]) * 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from nemo_curator.utils.distributed_utils import get_client, read_data
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import parse_client_args


def value_counts(df, column_name):
Expand Down Expand Up @@ -53,7 +54,7 @@ def main():
)
args = parser.parse_args()
print(f"Arguments parsed = {args}", flush=True)
client = get_client(args, cluster_type="gpu")
client = get_client(**parse_client_args(args), cluster_type="gpu")

print("Starting statistics workflow", flush=True)
st = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
write_to_disk,
)
from nemo_curator.utils.file_utils import get_remaining_files
from nemo_curator.utils.script_utils import parse_client_args

warnings.filterwarnings("ignore")

Expand Down Expand Up @@ -250,7 +251,7 @@ def main():
batch_size = args.batch_size
num_workers = 0

client = get_client(args, cluster_type="gpu")
client = get_client(**parse_client_args(args), cluster_type="gpu")
print("Starting quality classifier inference", flush=True)
global_st = time.time()
files_per_run = len(client.scheduler_info()["workers"]) * 2
Expand Down
Loading