Skip to content

Commit

Permalink
Merge branch 'main' into switch_to_crossfit
Browse files Browse the repository at this point in the history
  • Loading branch information
VibhuJawa committed May 16, 2024
2 parents 90a1a30 + ecd4f4b commit f0686ed
Show file tree
Hide file tree
Showing 18 changed files with 583 additions and 39 deletions.
1 change: 1 addition & 0 deletions config/fasttext_langid.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
input_field: text
filters:
- name: nemo_curator.filters.classifier_filter.FastTextLangId
log_score: True
params:
model_path: <Path to the FasText language id model (e.g., lid.176.bin)>
8 changes: 8 additions & 0 deletions nemo_curator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import dask

from .modules import *

# Dask will automatically convert the list score type
# to a string without this option.
# See https://github.com/NVIDIA/NeMo-Curator/issues/33
# This also happens when reading and writing to files
dask.config.set({"dataframe.convert-string": False})
1 change: 1 addition & 0 deletions nemo_curator/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
# TODO: remove when dask min version gets bumped
DASK_SHUFFLE_METHOD_ARG = _dask_version > parseVersion("2024.1.0")
DASK_P2P_ERROR = _dask_version < parseVersion("2023.10.0")
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parseVersion("2023.12.0")
5 changes: 0 additions & 5 deletions nemo_curator/filters/classifier_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ def __init__(self, model_path=None, min_langid_score=0.3):
self._cutoff = min_langid_score
self._name = "lang_id"

# Dask will automatically convert the list score type
# to a string without this option.
# See https://github.com/NVIDIA/NeMo-Curator/issues/33
dask.config.set({"dataframe.convert-string": False})

@batched
def score_document(self, df: pd.Series):
model_attr = f"{self._name}_{self._model_path}"
Expand Down
4 changes: 2 additions & 2 deletions nemo_curator/modifiers/pii_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import pandas as pd

from nemo_curator.modifiers import DocumentModifier
from nemo_curator.pii.algorithm import DEFAULT_LANGUAGE
from nemo_curator.pii.constants import DEFAULT_LANGUAGE, DEFAULT_MAX_DOC_SIZE
from nemo_curator.utils.decorators import batched
from nemo_curator.utils.distributed_utils import load_object_on_worker

Expand Down Expand Up @@ -97,7 +97,7 @@ def load_deidentifier(self):

if self.device == "gpu":
spacy.require_gpu()
from nemo_curator.pii.algorithm import DEFAULT_MAX_DOC_SIZE, PiiDeidentifier
from nemo_curator.pii.algorithm import PiiDeidentifier

deidentifier: PiiDeidentifier = PiiDeidentifier(
language=self.language,
Expand Down
26 changes: 5 additions & 21 deletions nemo_curator/pii/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from pathlib import Path
from typing import Any, List, Mapping, Union

# NOTE: Importing this module before cluster creation will create a primary CUDA context
# that leads to issues of all GPUs not being used when creating a cluster/client later on.
# Ensure that this module is always imported after cluster creation only when the algorithm
# needs to be executed. See: https://github.com/NVIDIA/NeMo-Curator/issues/64
import yaml
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
from presidio_analyzer.nlp_engine import NerModelConfiguration
Expand All @@ -30,36 +34,16 @@
from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

from nemo_curator.pii.constants import DEFAULT_LANGUAGE, SUPPORTED_ENTITIES
from nemo_curator.pii.custom_batch_analyzer_engine import CustomBatchAnalyzerEngine
from nemo_curator.pii.custom_nlp_engine import CustomNlpEngine
from nemo_curator.pii.recognizers.address_recognizer import AddressRecognizer

__all__ = [
"DEFAULT_LANGUAGE",
"SUPPORTED_ENTITIES",
"DEFAULT_MAX_DOC_SIZE",
"PiiDeidentifier",
]


DEFAULT_LANGUAGE = "en"
SUPPORTED_ENTITIES = [
"ADDRESS",
"CREDIT_CARD",
"EMAIL_ADDRESS",
"DATE_TIME",
"IP_ADDRESS",
"LOCATION",
"PERSON",
"URL",
"US_SSN",
"US_PASSPORT",
"US_DRIVER_LICENSE",
"PHONE_NUMBER",
]
DEFAULT_MAX_DOC_SIZE = 2000000


class PiiDeidentifier(object):
"""Cleans PII from an unstructured text"""

Expand Down
20 changes: 20 additions & 0 deletions nemo_curator/pii/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
DEFAULT_LANGUAGE = "en"

SUPPORTED_ENTITIES = [
"ADDRESS",
"CREDIT_CARD",
"EMAIL_ADDRESS",
"DATE_TIME",
"IP_ADDRESS",
"LOCATION",
"PERSON",
"URL",
"US_SSN",
"US_PASSPORT",
"US_DRIVER_LICENSE",
"PHONE_NUMBER",
]

DEFAULT_MAX_DOC_SIZE = 2000000

__all__ = ["DEFAULT_LANGUAGE", "SUPPORTED_ENTITIES", "DEFAULT_MAX_DOC_SIZE"]
23 changes: 17 additions & 6 deletions nemo_curator/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ def parse_str_of_num_bytes(s, return_str=False):
def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=None):
"""Worker function to write out the data to jsonl files"""

def _output_json(document):
myjson = json.dumps(document, ensure_ascii=False)
return myjson.encode("utf-8")
def _encode_text(document):
return document.strip().encode("utf-8")

def _name(start_index, npad, prefix, i):
tag = str(start_index + i).rjust(npad, "0")
Expand All @@ -195,11 +194,22 @@ def _name(start_index, npad, prefix, i):

output_glob_string = os.path.join(output_path, "*.jsonl")

documents.map(_output_json).to_textfiles(
output_files = documents.map(_encode_text).to_textfiles(
output_glob_string,
name_function=name,
)

# Delete empty files generated due to empty partitions in the bag
for output_file in output_files:
try:
if os.path.getsize(output_file) == 0:
os.remove(output_file)
except Exception as exception:
print(
f"An exception occurred when trying to delete {output_file}.\n{exception}",
flush=True,
)


def reshard_jsonl(
input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix=""
Expand All @@ -212,7 +222,8 @@ def reshard_jsonl(
output_dir: The output directory where the resharded jsonl files will be written
output_file_size: Approximate size of output files. Must specify with a string and
with the unit K, M or G for kilo, mega or gigabytes
start_index: Starting index for naming the output files
start_index: Starting index for naming the output files. Note: The indices may not
be continuous if the sharding process would output an empty file in its place
file_prefix: Prefix to use to prepend to output file number
"""

Expand All @@ -222,7 +233,7 @@ def reshard_jsonl(
input_files = list(get_all_files_paths_under(input_dir))

# Read in the dask bag
b = db.read_text(input_files, blocksize=blocksize).map(json.loads)
b = db.read_text(input_files, blocksize=blocksize)

# Prepare the output
output_dir = expand_outdir_and_mkdir(output_dir)
Expand Down
25 changes: 23 additions & 2 deletions nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
from operator import getitem

import numpy as np
import pandas as pd
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.dataframe.shuffle import partitioning_index
from dask.highlevelgraph import HighLevelGraph
from dask.utils import M

from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import rearange_by_column_direct
from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE


def _split_part(part, nsplits):
Expand Down Expand Up @@ -129,6 +130,21 @@ def extract_partitioning_index(
# a partition-wise merge between `left_df` and `right_df`.
# We call this `global_partitioning_index`:

if DASK_SHUFFLE_CAST_DTYPE:
# Need to use the same type-casting logic as `shuffle`
dtypes = {}
if not isinstance(merge_on, list):
merge_on = [merge_on]
for col, dtype in left_df[merge_on].dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if not dtypes:
dtypes = None
cast_dtype = {"cast_dtype": dtypes}
else:
# `cast_dtype` argument doesn't exist yet
cast_dtype = {}

num_bucket_files = bk_mapping.file_id.max() + 1
global_partitioning_index = left_df[merge_on].map_partitions(
partitioning_index,
Expand All @@ -137,6 +153,7 @@ def extract_partitioning_index(
enforce_metadata=False,
transform_divisions=False,
align_dataframes=False,
**cast_dtype,
)

if total_bucket_partitions < num_bucket_files:
Expand All @@ -157,7 +174,7 @@ def extract_partitioning_index(
# want to send the rows of `left_df` to the partition
# indices encoded in `global_partitioning_index`. Instead, we
# need to take a modulus with `parts_per_bucket_batch` to
# define a `"_partitoins"` column.
# define a `"_partitions"` column.
left_df["_partitions"] = global_partitioning_index % parts_per_bucket_batch

return left_df, global_partitioning_index
Expand Down Expand Up @@ -195,6 +212,10 @@ def merge_left_to_shuffled_right(
subset_bucket_df,
merge_on,
):
from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import (
rearange_by_column_direct,
)

# We are merging an unshuffled batch of "left" partitions
# with a shuffled batch of "right" partitions. To minimize
# data movement, we can manaully rerrange the "left" batch
Expand Down
74 changes: 74 additions & 0 deletions tests/test_fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
from itertools import combinations
from typing import Iterable

import dask.dataframe as dd
import numpy as np
import pytest
import yaml
from dask import config
from dask.dataframe.utils import assert_eq
from distributed import Client

from nemo_curator import LSH, FuzzyDuplicates, FuzzyDuplicatesConfig, MinHash
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.fuzzy_dedup_utils.merge_utils import extract_partitioning_index
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from

cudf = gpu_only_import("cudf")
Expand Down Expand Up @@ -367,3 +370,74 @@ def test_from_yaml(self, tmpdir):
config = FuzzyDuplicatesConfig.from_yaml(tmpdir / "config.yaml")
for param in yaml_params:
assert getattr(config, param) == yaml_params[param]


@pytest.mark.parametrize(
"backend",
[
"pandas",
pytest.param(
"cudf",
marks=pytest.mark.gpu,
),
],
)
def test_extract_partitioning_index(backend):

def add_partition_info(df, partition_info=None):
if partition_info is None:
df["file_id"] = -1
else:
df["file_id"] = partition_info["number"]
return df

with config.set({"dataframe.backend": backend}):

# Create a random `unshuffled` DataFrame with a
# "part_id" column to be used as the shuffle index
npartitions_left = 7
unshuffled = dd.from_dict(
{"part_id": np.random.randint(25, size=1000, dtype="int32")},
npartitions=npartitions_left,
)

# Create a `bk_mapping` DataFrame that defines
# the "correct" mapping beween "part_id" and
# the destination partition ("file_id")
npartitions_right = 5
bk_mapping = (
dd.from_dict(
{"part_id": np.arange(25, dtype="int32")},
npartitions=npartitions_right,
)
.shuffle("part_id")
.map_partitions(add_partition_info)
.compute()
)

# Use `extract_partitioning_index` to calculate
# the partitioning index and assign it as a new
# "_partitions" column
result, _ = extract_partitioning_index(
unshuffled,
"part_id",
bk_mapping,
npartitions_right,
npartitions_right,
)

# Rename the "_partitions" column, shuffle by "part_id",
# and then assign a "file_id" column to reflect the final
# partition of each row
check = (
result.rename(columns={"_partitions": "expected_file_id"})
.shuffle(
"part_id",
npartitions=npartitions_right,
)
.map_partitions(add_partition_info)
.compute()
)

# Check that the real and expected partitions match
assert (check["file_id"] == check["expected_file_id"]).all()
1 change: 0 additions & 1 deletion tests/test_pii_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from pathlib import Path

import pandas as pd
import pytest
from dask import dataframe as dd
from dask.distributed import Client, LocalCluster

Expand Down
19 changes: 19 additions & 0 deletions tutorials/peft-curation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Curating Datasets for Parameter Efficient Fine-tuning

This tutorial demonstrates the usage of NeMo Curator's Python API to curate a dataset for
parameter-efficient fine-tuning (PEFT).

In this tutorial, we use the [Enron Emails dataset](https://huggingface.co/datasets/neelblabla/enron_labeled_emails_with_subjects-llama2-7b_finetuning),
which is a dataset of emails with corresponding classification labels for each email. Each email has
a subject, a body and a category (class label). We demonstrate various filtering and processing
operations that can be applied to each record.

## Usage
After installing the NeMo Curator package, you can simply run the following command:
```
python tutorials/peft-curation/main.py
```

By default, this tutorial will use at most 8 workers to run the curation pipeline. If you face any
out of memory issues, you can reduce the number of workers by supplying the `--n-workers=N` argument,
where `N` is the number of workers to spawn.
Loading

0 comments on commit f0686ed

Please sign in to comment.