Skip to content

Commit

Permalink
Adding fuzzy and semantic dedupe (#428)
Browse files Browse the repository at this point in the history
Signed-off-by: Rucha Apte <[email protected]>
  • Loading branch information
ruchaa-apte authored Dec 13, 2024
1 parent 079d46f commit 3c3cc98
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 23 deletions.
9 changes: 6 additions & 3 deletions tutorials/dapt-curation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The tutorial follows the steps below:<br>
- Heuristic-based quality filtering (Number of lines, worc count, top N-grams, etc.)
- Fix unicode errors via ftfy
- PII redaction
- GPU accelerated fuzzy and semanctic deduplication
- Step 6: Save the filtered and curated data <br>
- Step 7: Blend datasets and shuffle

Expand All @@ -45,8 +46,10 @@ The tutorial follows the steps below:<br>

After installing the NeMo Curator package, install the dependencies and run:

`pip install -r code/requirements.txt`

`python code/main.py`
```bash
pip install -r code/requirements.txt
cd code
python main.py
```

This will download chip-design related datasets and begin the data curation pipeline.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Configuration file for semdantic dedup
cache_dir: "workspace/semdedup_cache/text"
num_files: 16

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128

# Clustering configuration
clustering_save_loc: "clustering_results"
n_clusters: 20
seed: 1234
max_iter: 100
kmeans_with_cos_dist: false

# Semdedup configuration
which_to_keep: "hard"
largest_cluster_size_to_process: 100000
sim_metric: "cosine"

# Extract dedup configuration
eps_thresholds:
- 0.1
- 0.01

# Which threshold to use for extracting deduped data
eps_to_extract: 0.1
76 changes: 61 additions & 15 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
CodeLineCountFilter,
TextLineCountFilter,
clean_and_unify,
dedupe,
exact_dedupe,
filter_code,
filter_text,
fuzzy_dedupe,
redact_code,
rm_dir,
semantic_dedupe,
)

import nemo_curator as nc
Expand All @@ -48,6 +51,7 @@

SCRIPT_DIR_PATH = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data")
CONFIG_DIR = os.path.join(SCRIPT_DIR_PATH, "configs")


def download_sources(
Expand Down Expand Up @@ -117,7 +121,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
args (Any): Command-line arguments.
jsonl_dir (str): Directory path where the JSONL files are stored.
"""
print("Running the curation pipeline...")
# Initialize the Dask cluster.
client = get_client(**ArgumentHelper.parse_client_args(args))

Expand All @@ -129,7 +132,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
TextLineCountFilter(), text_field="file_type_count", score_type=bool
),
filter_text,
dedupe,
exact_dedupe,
]
)

Expand All @@ -141,7 +144,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
CodeLineCountFilter(), text_field="file_type_count", score_type=bool
),
filter_code,
dedupe,
exact_dedupe,
redact_code,
]
)
Expand All @@ -167,17 +170,54 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
+ orig_dataset_code.df["line_count"].astype(str)
)

print("Executing the curation pipeline...")
dataset_text = curation_steps_text(orig_dataset_text)
dataset_text = dataset_text.persist()

print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
print(f"After dataprep: {len(dataset_text.df)}")

dataset_code = curation_steps_code(orig_dataset_code)
dataset_code = dataset_code.persist()

print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
print(f"After dataprep for text files: {len(dataset_text.df)}")
print(f"Original dataset length for code files: {len(orig_dataset_code.df)}")
print(f"After dataprep: {len(dataset_code.df)}")
print(f"After dataprep length for code files: {len(dataset_code.df)}")

if args.device == "gpu":
print("Executing the semantic dedupe pipeline...")
gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf"))
gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf"))
sem_dedupe_config_yaml_path = os.path.join(
CONFIG_DIR, "text_semantic_dedupe_config.yaml"
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "semantic_dedupe", "text")
rm_dir(CACHE_DIR)
duplicates = semantic_dedupe(
dataset=gpu_dataset_text,
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
cache=CACHE_DIR,
)
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
semantic_dataset_text = DocumentDataset(
gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)]
)
print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}")

print("Executing the fuzzy dedupe pipeline...")
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(
dataset=semantic_dataset_text, cache=CACHE_DIR
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)

dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
print(f"After fuzzy dedupe for text files: {len(dataset_text.df)}")
print(f"After fuzzy dedupe: {len(dataset_code.df)}")

final_dataset_text = dataset_text.persist()
final_dataset_code = dataset_code.persist()

print("Writing the results to disk...")

# Overwrite existing files in the curated directory.
out_path = os.path.join(DATA_DIR, "curated")
Expand All @@ -186,15 +226,18 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
shutil.rmtree(out_path)

os.makedirs(out_path)
dataset_text.to_json(out_path, write_to_filename=True)
dataset_code.to_json(out_path, write_to_filename=True)
final_dataset_text.to_json(out_path, write_to_filename=True)
final_dataset_code.to_json(out_path, write_to_filename=True)

print("Writing results to disk completed")

# Split the dataset by file category and save curated files (optional - to create blended datasets)
print("Split dataset by metadata")
separated_data_text = separate_by_metadata(
dataset_text.df, out_path, "category"
final_dataset_text.df, out_path, "category"
).compute()
separated_data_code = separate_by_metadata(
dataset_code.df, out_path, "category"
final_dataset_code.df, out_path, "category"
).compute()

client.close()
Expand Down Expand Up @@ -239,6 +282,7 @@ def main():
# Download all the sources and get the list of text and code files.
text_files, code_files = download_sources(100, 100, 100)
run_curation_pipeline(args, text_files, code_files)
print("Data Curation completed")

# blend and shuffle datasets
root_path = os.path.join(DATA_DIR, "curated")
Expand All @@ -250,7 +294,9 @@ def main():
]
dataset_weights = [1.0, 4.0, 4.0, 1.0]
target_size = 20

blend_and_shuffle(args, dataset_paths, dataset_weights, target_size)
print("Data Blending completed")


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion tutorials/dapt-curation/code/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
arxiv
arxiv==2.1.0
arxiv-downloader
cchardet
nltk==3.8.1
poppler-utils
unstructured[all-docs]==0.14.5
unstructured[pdf]
92 changes: 88 additions & 4 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,23 @@
# limitations under the License.

import json
import os
import re

import dask.dataframe as dd
import pandas as pd

from nemo_curator import ExactDuplicates, Modify, ScoreFilter, Sequential
import yaml

from nemo_curator import (
ExactDuplicates,
FuzzyDuplicates,
FuzzyDuplicatesConfig,
Modify,
ScoreFilter,
SemDedup,
SemDedupConfig,
Sequential,
)
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import (
DocumentFilter,
Expand All @@ -37,7 +48,10 @@
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter
from nemo_curator.pii.constants import DEFAULT_LANGUAGE, DEFAULT_MAX_DOC_SIZE
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.file_utils import (
expand_outdir_and_mkdir,
get_all_files_paths_under,
)


class QuotationUnifier(DocumentModifier):
Expand Down Expand Up @@ -259,7 +273,7 @@ def func2(row):
return redacted_dataset


def dedupe(dataset: DocumentDataset) -> DocumentDataset:
def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
"""
Remove exact duplicates from the given DocumentDataset.
Expand All @@ -282,6 +296,71 @@ def dedupe(dataset: DocumentDataset) -> DocumentDataset:
return DocumentDataset(deduped)


def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
"""
Removes near-duplicate documents and code lines
Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
Returns:
DocumentDataset: The deduplicated dataset.
"""
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache,
id_field="id",
text_field="text",
seed=42,
char_ngrams=20,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=False,
num_anchors=2,
jaccard_threshold=0.8,
)
fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset)

docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
duplicate_ids = docs_to_remove.compute().id.to_arrow().to_pylist()
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)


def semantic_dedupe(
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
):
"""
Perform semantic deduplication on the given dataset.
Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
Returns:
The deduplicated DocumentDataset.
"""
partition_lengths = dataset.df.map_partitions(len).compute()
non_empty_partitions = [
i for i, length in enumerate(partition_lengths) if length > 0
]
dataset.df = dataset.df.partitions[non_empty_partitions]

semdedup_config = SemDedupConfig.from_yaml(sem_dedupe_config_yaml_path)
expand_outdir_and_mkdir(semdedup_config.cache_dir)
semdup = SemDedup(config=semdedup_config, id_column_type="str")
duplicates = semdup(dataset)
return duplicates


class TextLineCountFilter(DocumentFilter):
"""
Discard text files based on number of lines.
Expand Down Expand Up @@ -323,3 +402,8 @@ def score_document(self, text: str) -> bool:

def keep_document(self, score) -> bool:
return score


def rm_dir(cache_dir):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")

0 comments on commit 3c3cc98

Please sign in to comment.