From 4a183cf34f90e67c32e13ac7f48efb2e1948c82e Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 2 Apr 2024 16:03:56 -0700 Subject: [PATCH 01/35] Add initial dataset blending function Signed-off-by: Ryan Wolf --- nemo_curator/datasets/doc_dataset.py | 2 +- nemo_curator/modules/blend_datasets.py | 45 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 nemo_curator/modules/blend_datasets.py diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index af45f290c..122ea98b6 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -25,7 +25,7 @@ class DocumentDataset: Internally it may be distributed across multiple nodes, and may be on GPUs. """ - def __init__(self, dataset_df): + def __init__(self, dataset_df: dd.DataFrame): self.df = dataset_df def __len__(self): diff --git a/nemo_curator/modules/blend_datasets.py b/nemo_curator/modules/blend_datasets.py new file mode 100644 index 000000000..242211c86 --- /dev/null +++ b/nemo_curator/modules/blend_datasets.py @@ -0,0 +1,45 @@ +import math +from typing import List + +import dask.dataframe as dd + +from nemo_curator.datasets.doc_dataset import DocumentDataset + + +def blend_datasets( + target_size: int, datasets: List[DocumentDataset], sampling_weights: List[float] +) -> DocumentDataset: + """ + Combined multiple datasets into one with different amounts of each dataset + Args: + target_size: The number of documents the resulting dataset should have + datasets: A list of all datasets to combine together + sampling_weights: A list of weights to assign to each dataset in the input. Weights will be + normalized across the whole list as a part of the sampling process. For example, if the normalized + sampling weight for dataset 1 is 0.02, 2% ofthe total samples will be sampled from dataset 1. + """ + if len(datasets) != len(sampling_weights): + raise ValueError( + f"Different number of datasets and weights specified. {len(datasets)} datasets and {len(sampling_weights)}" + ) + + weight_sum = sum(sampling_weights) + sampling_weights = [weight / weight_sum for weight in sampling_weights] + num_documents_per_dataset = [ + math.ceil(weight * target_size) for weight in sampling_weights + ] + + blend_components = [] + for dataset, num_documents in zip(datasets, num_documents_per_dataset): + # Repeatedly sample from the dataset + num_epochs = math.ceil(num_documents / len(dataset)) + for _ in range(num_epochs): + sample = dataset.df.head(n=num_documents, npartitions=-1, compute=False) + blend_components.append(sample) + num_documents -= len(sample) + + blended_dataset = dd.concat(blend_components) + + # TODO: Shuffle the dataset + + return blended_dataset From 969123cd76cce4c886413ab3dfb4dcf12564945d Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 10:51:43 -0700 Subject: [PATCH 02/35] Add blend unit tests Signed-off-by: Ryan Wolf --- nemo_curator/modules/__init__.py | 2 + tests/test_blend_datasets.py | 64 ++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 tests/test_blend_datasets.py diff --git a/nemo_curator/modules/__init__.py b/nemo_curator/modules/__init__.py index d845441f3..ff0570a26 100644 --- a/nemo_curator/modules/__init__.py +++ b/nemo_curator/modules/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from .add_id import AddId +from .blend_datasets import blend_datasets from .exact_dedup import ExactDuplicates from .filter import Filter, Score, ScoreFilter from .fuzzy_dedup import LSH, MinHash @@ -38,4 +39,5 @@ "Sequential", "TaskDecontamination", "AddId", + "blend_datasets", ] diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py new file mode 100644 index 000000000..ecb63bed1 --- /dev/null +++ b/tests/test_blend_datasets.py @@ -0,0 +1,64 @@ +import dask.dataframe as dd +import pandas as pd +from dask.dataframe.utils import assert_eq + +import nemo_curator as nc +from nemo_curator.datasets import DocumentDataset + + +def list_to_dataset(documents, col_name="text", npartitions=2): + data = {col_name: documents} + pdf = pd.DataFrame(data) + + return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + + +class TestBlending: + def test_blend_as_original(): + first_dataset = list_to_dataset(["one", "two", "three"]) + result_dataset = nc.blend_datasets(len(first_dataset), [first_dataset], [1.0]) + assert_eq(first_dataset, result_dataset) + + def test_equal_blend(): + first_dataset = list_to_dataset(["a", "a"]) + second_dataset = list_to_dataset(["b", "b"]) + result_dataset = nc.blend_datasets( + 2, [first_dataset, second_dataset], [0.5, 0.5] + ) + counts = result_dataset.df["text"].value_counts() + assert len(result_dataset) == 2 + assert counts["a"] == 1 + assert counts["b"] == 1 + + def test_equal_blend_with_weights(): + first_dataset = list_to_dataset(["a", "a"]) + second_dataset = list_to_dataset(["b", "b"]) + result_dataset = nc.blend_datasets( + 2, [first_dataset, second_dataset], [2.0, 2.0] + ) + counts = result_dataset.df["text"].value_counts() + assert len(result_dataset) == 2 + assert counts["a"] == 1 + assert counts["b"] == 1 + + def test_uneven_blend(): + first_dataset = list_to_dataset(["a", "a"]) + second_dataset = list_to_dataset(["b", "b"]) + result_dataset = nc.blend_datasets( + 4, [first_dataset, second_dataset], [3.0, 1.0] + ) + counts = result_dataset.df["text"].value_counts() + assert len(result_dataset) == 4 + assert counts["a"] == 3 + assert counts["b"] == 1 + + def test_very_uneven_blend(): + first_dataset = list_to_dataset(["a", "a"]) + second_dataset = list_to_dataset(["b", "b"]) + result_dataset = nc.blend_datasets( + 4, [first_dataset, second_dataset], [1.0, 0.0] + ) + counts = result_dataset.df["text"].value_counts() + assert len(result_dataset) == 4 + assert counts["a"] == 4 + assert counts["b"] == 0 From d8e79225999078fd613b14e905920c6a4912cdf6 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:34:26 -0700 Subject: [PATCH 03/35] Add self parameter Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index ecb63bed1..ef7800428 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -14,12 +14,12 @@ def list_to_dataset(documents, col_name="text", npartitions=2): class TestBlending: - def test_blend_as_original(): + def test_blend_as_original(self): first_dataset = list_to_dataset(["one", "two", "three"]) result_dataset = nc.blend_datasets(len(first_dataset), [first_dataset], [1.0]) assert_eq(first_dataset, result_dataset) - def test_equal_blend(): + def test_equal_blend(self): first_dataset = list_to_dataset(["a", "a"]) second_dataset = list_to_dataset(["b", "b"]) result_dataset = nc.blend_datasets( @@ -30,7 +30,7 @@ def test_equal_blend(): assert counts["a"] == 1 assert counts["b"] == 1 - def test_equal_blend_with_weights(): + def test_equal_blend_with_weights(self): first_dataset = list_to_dataset(["a", "a"]) second_dataset = list_to_dataset(["b", "b"]) result_dataset = nc.blend_datasets( @@ -41,7 +41,7 @@ def test_equal_blend_with_weights(): assert counts["a"] == 1 assert counts["b"] == 1 - def test_uneven_blend(): + def test_uneven_blend(self): first_dataset = list_to_dataset(["a", "a"]) second_dataset = list_to_dataset(["b", "b"]) result_dataset = nc.blend_datasets( @@ -52,7 +52,7 @@ def test_uneven_blend(): assert counts["a"] == 3 assert counts["b"] == 1 - def test_very_uneven_blend(): + def test_very_uneven_blend(self): first_dataset = list_to_dataset(["a", "a"]) second_dataset = list_to_dataset(["b", "b"]) result_dataset = nc.blend_datasets( From fd5f33984e497f0e0b5b70bb7f3fae683b9d2272 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:39:15 -0700 Subject: [PATCH 04/35] Fix return type of blend dataset Signed-off-by: Ryan Wolf --- nemo_curator/modules/blend_datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/blend_datasets.py b/nemo_curator/modules/blend_datasets.py index 242211c86..98719cbf9 100644 --- a/nemo_curator/modules/blend_datasets.py +++ b/nemo_curator/modules/blend_datasets.py @@ -42,4 +42,4 @@ def blend_datasets( # TODO: Shuffle the dataset - return blended_dataset + return DocumentDataset(blended_dataset) From 1e1401ba10c5f25b417de39c4a4ebbc8aaef6f7a Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:46:09 -0700 Subject: [PATCH 05/35] Fix blending tests Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index ef7800428..d15c58528 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -17,7 +17,7 @@ class TestBlending: def test_blend_as_original(self): first_dataset = list_to_dataset(["one", "two", "three"]) result_dataset = nc.blend_datasets(len(first_dataset), [first_dataset], [1.0]) - assert_eq(first_dataset, result_dataset) + assert_eq(first_dataset.df, result_dataset.df) def test_equal_blend(self): first_dataset = list_to_dataset(["a", "a"]) @@ -25,7 +25,7 @@ def test_equal_blend(self): result_dataset = nc.blend_datasets( 2, [first_dataset, second_dataset], [0.5, 0.5] ) - counts = result_dataset.df["text"].value_counts() + counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 2 assert counts["a"] == 1 assert counts["b"] == 1 @@ -36,7 +36,7 @@ def test_equal_blend_with_weights(self): result_dataset = nc.blend_datasets( 2, [first_dataset, second_dataset], [2.0, 2.0] ) - counts = result_dataset.df["text"].value_counts() + counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 2 assert counts["a"] == 1 assert counts["b"] == 1 @@ -47,7 +47,7 @@ def test_uneven_blend(self): result_dataset = nc.blend_datasets( 4, [first_dataset, second_dataset], [3.0, 1.0] ) - counts = result_dataset.df["text"].value_counts() + counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 4 assert counts["a"] == 3 assert counts["b"] == 1 @@ -58,7 +58,7 @@ def test_very_uneven_blend(self): result_dataset = nc.blend_datasets( 4, [first_dataset, second_dataset], [1.0, 0.0] ) - counts = result_dataset.df["text"].value_counts() + counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 4 assert counts["a"] == 4 assert counts["b"] == 0 From 5d966ba9880beb37fbc82eddc7e446405099e63e Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:47:08 -0700 Subject: [PATCH 06/35] Change assert statement for very uneven blend Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index d15c58528..d21b7876c 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -61,4 +61,4 @@ def test_very_uneven_blend(self): counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 4 assert counts["a"] == 4 - assert counts["b"] == 0 + assert "b" not in counts["b"] From 64d50405630d28bd864c864b7751fbd85fcf8707 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:47:46 -0700 Subject: [PATCH 07/35] Fix key error Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index d21b7876c..00fd81fca 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -61,4 +61,4 @@ def test_very_uneven_blend(self): counts = result_dataset.df["text"].value_counts().compute() assert len(result_dataset) == 4 assert counts["a"] == 4 - assert "b" not in counts["b"] + assert "b" not in counts From 7972669c1f0b477b1afd150f4d2f3b8950bf7738 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 12:54:56 -0700 Subject: [PATCH 08/35] Add proper proportion blending test Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index 00fd81fca..a8932a0c7 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -62,3 +62,16 @@ def test_very_uneven_blend(self): assert len(result_dataset) == 4 assert counts["a"] == 4 assert "b" not in counts + + def test_proper_uneven_blend(self): + first_dataset = list_to_dataset(["a", "b", "c", "d"]) + second_dataset = list_to_dataset(["e", "f"]) + result_dataset = nc.blend_datasets( + 8, [first_dataset, second_dataset], [1.0, 0.0] + ) + counts = result_dataset.df["text"].value_counts().compute() + assert len(result_dataset) == 8 + assert counts["a"] == 2 + assert counts["b"] == 2 + assert counts["c"] == 2 + assert counts["d"] == 2 From f603c2c406f9f68f89022037a051358a05439e09 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Thu, 4 Apr 2024 14:22:47 -0700 Subject: [PATCH 09/35] Add four dataset blend and clarify docs Signed-off-by: Ryan Wolf --- nemo_curator/modules/blend_datasets.py | 6 +++++- tests/test_blend_datasets.py | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/nemo_curator/modules/blend_datasets.py b/nemo_curator/modules/blend_datasets.py index 98719cbf9..02267b72e 100644 --- a/nemo_curator/modules/blend_datasets.py +++ b/nemo_curator/modules/blend_datasets.py @@ -12,11 +12,15 @@ def blend_datasets( """ Combined multiple datasets into one with different amounts of each dataset Args: - target_size: The number of documents the resulting dataset should have + target_size: The number of documents the resulting dataset should have. + The actual size of the dataset may be slightly larger if the normalized weights do not allow + for even mixtures of the datasets. datasets: A list of all datasets to combine together sampling_weights: A list of weights to assign to each dataset in the input. Weights will be normalized across the whole list as a part of the sampling process. For example, if the normalized sampling weight for dataset 1 is 0.02, 2% ofthe total samples will be sampled from dataset 1. + There are guaranteed to be math.ceil(normalized_weight_i * target_size) elements from dataset i in + the final blend. """ if len(datasets) != len(sampling_weights): raise ValueError( diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index a8932a0c7..110daced6 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -75,3 +75,17 @@ def test_proper_uneven_blend(self): assert counts["b"] == 2 assert counts["c"] == 2 assert counts["d"] == 2 + + def test_four_dataset_blend(self): + datasets = [] + datasets.append(list_to_dataset(["a", "a"])) + datasets.append(list_to_dataset(["b", "b", "b"])) + datasets.append(list_to_dataset(["c"])) + datasets.append(list_to_dataset(["d", "d", "d", "d"])) + result_dataset = nc.blend_datasets(8, datasets, [1.0, 2.0, 3.0, 4.0]) + counts = result_dataset.df["text"].value_counts().compute() + assert len(result_dataset) == 10 + assert counts["a"] == 1 + assert counts["b"] == 2 + assert counts["c"] == 3 + assert counts["d"] == 4 From 203f88ab5905582a00b0dd8f012907ab9f9cea33 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 11:52:32 -0700 Subject: [PATCH 10/35] Add shuffle module Signed-off-by: Ryan Wolf --- nemo_curator/modules/__init__.py | 3 +- nemo_curator/modules/blend_datasets.py | 49 ------------ nemo_curator/modules/dataset_ops.py | 105 +++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 50 deletions(-) delete mode 100644 nemo_curator/modules/blend_datasets.py create mode 100644 nemo_curator/modules/dataset_ops.py diff --git a/nemo_curator/modules/__init__.py b/nemo_curator/modules/__init__.py index ff0570a26..5fe1e394c 100644 --- a/nemo_curator/modules/__init__.py +++ b/nemo_curator/modules/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from .add_id import AddId -from .blend_datasets import blend_datasets +from .dataset_ops import blend_datasets, Shuffle from .exact_dedup import ExactDuplicates from .filter import Filter, Score, ScoreFilter from .fuzzy_dedup import LSH, MinHash @@ -40,4 +40,5 @@ "TaskDecontamination", "AddId", "blend_datasets", + "Shuffle", ] diff --git a/nemo_curator/modules/blend_datasets.py b/nemo_curator/modules/blend_datasets.py deleted file mode 100644 index 02267b72e..000000000 --- a/nemo_curator/modules/blend_datasets.py +++ /dev/null @@ -1,49 +0,0 @@ -import math -from typing import List - -import dask.dataframe as dd - -from nemo_curator.datasets.doc_dataset import DocumentDataset - - -def blend_datasets( - target_size: int, datasets: List[DocumentDataset], sampling_weights: List[float] -) -> DocumentDataset: - """ - Combined multiple datasets into one with different amounts of each dataset - Args: - target_size: The number of documents the resulting dataset should have. - The actual size of the dataset may be slightly larger if the normalized weights do not allow - for even mixtures of the datasets. - datasets: A list of all datasets to combine together - sampling_weights: A list of weights to assign to each dataset in the input. Weights will be - normalized across the whole list as a part of the sampling process. For example, if the normalized - sampling weight for dataset 1 is 0.02, 2% ofthe total samples will be sampled from dataset 1. - There are guaranteed to be math.ceil(normalized_weight_i * target_size) elements from dataset i in - the final blend. - """ - if len(datasets) != len(sampling_weights): - raise ValueError( - f"Different number of datasets and weights specified. {len(datasets)} datasets and {len(sampling_weights)}" - ) - - weight_sum = sum(sampling_weights) - sampling_weights = [weight / weight_sum for weight in sampling_weights] - num_documents_per_dataset = [ - math.ceil(weight * target_size) for weight in sampling_weights - ] - - blend_components = [] - for dataset, num_documents in zip(datasets, num_documents_per_dataset): - # Repeatedly sample from the dataset - num_epochs = math.ceil(num_documents / len(dataset)) - for _ in range(num_epochs): - sample = dataset.df.head(n=num_documents, npartitions=-1, compute=False) - blend_components.append(sample) - num_documents -= len(sample) - - blended_dataset = dd.concat(blend_components) - - # TODO: Shuffle the dataset - - return DocumentDataset(blended_dataset) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py new file mode 100644 index 000000000..102d4a6fe --- /dev/null +++ b/nemo_curator/modules/dataset_ops.py @@ -0,0 +1,105 @@ +import math +from typing import Any, Callable, List, Optional + +import dask.array as da +import dask.dataframe as dd + +from nemo_curator.datasets.doc_dataset import DocumentDataset + + +def default_filename(partition_num: int) -> str: + return f"file_{partition_num:010d}.jsonl" + + +class Shuffle: + def __init__( + self, + seed: Optional[int] = None, + npartitions: Optional[int] = None, + partition_to_filename: Callable[[int], str] = default_filename, + ) -> None: + """ + Randomly permutes the dataset. This will make the original "filename" column invalid, so if the column is present it will be overwritten. + Args: + seed: The random seed that will be used to determine which file each datapoint goes to. + npartitions: The output number of partitions to create in the dataset. + If None, it will retain the same number of partitions as the original dataset. + partition_to_filename: If the filename column is present, it will be overwritten. + Passing a function in through this argument allows the user to configure what the filename + will look like given the partition number. The default method names the partition + f'file_{partition_num:010d}.jsonl' and should be changed if the user is not using a .jsonl format. + """ + self.seed = seed + self.npartitions = npartitions + self.partition_to_filename = partition_to_filename + + def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + rand_col = "_shuffle_rand" + new_npartitions = ( + dataset.df.npartitions if self.npartitions is None else self.npartitions + ) + + rng = da.random.default_rng(seed=self.seed) + rand_array = rng.randint(0, new_npartitions, size=len(dataset.df)) + rand_df = dd.from_dask_array(rand_array, columns=[rand_col]).repartition( + npartitions=dataset.df.npartitions + ) + dataset.df[rand_col] = rand_df[rand_col] + + shuffled_df = dataset.df.shuffle(rand_col, npartitions=new_npartitions) + shuffled_df.drop(columns=[rand_col]) + + if "filename" in shuffled_df.columns: + shuffled_df = shuffled_df.map_partitions(self._overwrite_filename) + + return shuffled_df + + def _overwrite_filename(self, partition, partition_info=None): + if partition_info is None: + return partition + + filename = self.partition_to_filename(partition_info["number"]) + partition["filename"] = filename + + return partition + + +def blend_datasets( + target_size: int, datasets: List[DocumentDataset], sampling_weights: List[float] +) -> DocumentDataset: + """ + Combined multiple datasets into one with different amounts of each dataset + Args: + target_size: The number of documents the resulting dataset should have. + The actual size of the dataset may be slightly larger if the normalized weights do not allow + for even mixtures of the datasets. + datasets: A list of all datasets to combine together + sampling_weights: A list of weights to assign to each dataset in the input. Weights will be + normalized across the whole list as a part of the sampling process. For example, if the normalized + sampling weight for dataset 1 is 0.02, 2% ofthe total samples will be sampled from dataset 1. + There are guaranteed to be math.ceil(normalized_weight_i * target_size) elements from dataset i in + the final blend. + """ + if len(datasets) != len(sampling_weights): + raise ValueError( + f"Different number of datasets and weights specified. {len(datasets)} datasets and {len(sampling_weights)}" + ) + + weight_sum = sum(sampling_weights) + sampling_weights = [weight / weight_sum for weight in sampling_weights] + num_documents_per_dataset = [ + math.ceil(weight * target_size) for weight in sampling_weights + ] + + blend_components = [] + for dataset, num_documents in zip(datasets, num_documents_per_dataset): + # Repeatedly sample from the dataset + num_epochs = math.ceil(num_documents / len(dataset)) + for _ in range(num_epochs): + sample = dataset.df.head(n=num_documents, npartitions=-1, compute=False) + blend_components.append(sample) + num_documents -= len(sample) + + blended_dataset = dd.concat(blend_components) + + return DocumentDataset(blended_dataset) From 58994324d6c87f0234cd78d0274f2943bf092a7c Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 14:50:42 -0700 Subject: [PATCH 11/35] Add blend example and tests Signed-off-by: Ryan Wolf --- examples/blend_and_shuffle.py | 53 ++++++++++++++++++++++++++++ tests/test_shuffle.py | 66 +++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 examples/blend_and_shuffle.py create mode 100644 tests/test_shuffle.py diff --git a/examples/blend_and_shuffle.py b/examples/blend_and_shuffle.py new file mode 100644 index 000000000..e070d5d2a --- /dev/null +++ b/examples/blend_and_shuffle.py @@ -0,0 +1,53 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +import nemo_curator as nc +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client +from nemo_curator.utils.script_utils import add_distributed_args + + +def main(args): + # Params + dataset_paths = ["/path/to/first", "/path/to/second", "/path/to/third"] + dataset_weights = [5.0, 2.0, 1.0] + target_size = 1000 + output_path = "/path/to/output" + + # Set up Dask client + client = get_client(args, args.device) + + # Blend the datasets + datasets = [DocumentDataset.read_json(path) for path in dataset_paths] + blended_dataset = nc.blend_datasets(target_size, datasets, dataset_weights) + + shuffle = nc.Shuffle(seed=42) + blended_dataset = shuffle(blended_dataset) + + # Save the blend + blended_dataset.to_json(output_path) + + +def attach_args( + parser=argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ), +): + return add_distributed_args(parser) + + +if __name__ == "__main__": + main(attach_args().parse_args()) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py new file mode 100644 index 000000000..9d44fd1e7 --- /dev/null +++ b/tests/test_shuffle.py @@ -0,0 +1,66 @@ +import dask.dataframe as dd +import pandas as pd +from dask.dataframe.utils import assert_eq + +import nemo_curator as nc +from nemo_curator.datasets import DocumentDataset + + +def list_to_dataset(documents, col_name="text", npartitions=2): + data = {col_name: documents} + pdf = pd.DataFrame(data) + + return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + + +class TestShuffling: + def test_shuffle(self): + original_dataset = list_to_dataset(["one", "two", "three"]) + expected_dataset = list_to_dataset(["one", "two", "three"]) + shuffle = nc.Shuffle(seed=42) + result_dataset = shuffle(original_dataset) + assert_eq(expected_dataset.df, result_dataset.df) + + def test_new_partitions(self): + original_dataset = list_to_dataset(["one", "two", "three"], npartitions=3) + expected_dataset = list_to_dataset(["one", "two", "three"]) + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + assert_eq(expected_dataset.df, result_dataset.df) + + def test_filename(self): + original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["one", "two", "three"], + "filename": [ + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000002.jsonl", + ], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + assert_eq(expected_dataset.df, result_dataset.df) + + def test_custom_filenames(self): + original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["one", "two", "three"], + "filename": ["my_1.test", "my_1.test", "my_2.test"], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + def filename_fn(x): + return f"my_{x}.test" + + shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) + result_dataset = shuffle(original_dataset) + assert_eq(expected_dataset.df, result_dataset.df) From f1976103673739f4d5e2ccc536a52633c4808f49 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:09:14 -0700 Subject: [PATCH 12/35] Fix random method name Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 102d4a6fe..f4648ab14 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -40,7 +40,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: ) rng = da.random.default_rng(seed=self.seed) - rand_array = rng.randint(0, new_npartitions, size=len(dataset.df)) + rand_array = rng.integers(0, new_npartitions, size=len(dataset.df)) rand_df = dd.from_dask_array(rand_array, columns=[rand_col]).repartition( npartitions=dataset.df.npartitions ) From 2bc5a5f0428b11411f4fe9cf41889fda914e0e34 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:10:39 -0700 Subject: [PATCH 13/35] Wrap return type in DocumentDataset Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index f4648ab14..860187cc9 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -52,7 +52,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: if "filename" in shuffled_df.columns: shuffled_df = shuffled_df.map_partitions(self._overwrite_filename) - return shuffled_df + return DocumentDataset(shuffled_df) def _overwrite_filename(self, partition, partition_info=None): if partition_info is None: From bb9803d60d6778862ed9622e169ebed24f4a44a1 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:12:08 -0700 Subject: [PATCH 14/35] Save result of column drop Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 860187cc9..8097daf29 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -47,7 +47,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: dataset.df[rand_col] = rand_df[rand_col] shuffled_df = dataset.df.shuffle(rand_col, npartitions=new_npartitions) - shuffled_df.drop(columns=[rand_col]) + shuffled_df = shuffled_df.drop(columns=[rand_col]) if "filename" in shuffled_df.columns: shuffled_df = shuffled_df.map_partitions(self._overwrite_filename) From 414091db7583d14c4553f53159655cbd346cbd9e Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:21:35 -0700 Subject: [PATCH 15/35] Change equality check for shuffle tests Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 9d44fd1e7..561642c3c 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -13,20 +13,24 @@ def list_to_dataset(documents, col_name="text", npartitions=2): return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) +def all_equal(left_dataset, right_dataset): + return all(left_dataset.df.compute() == right_dataset.df.compute()) + + class TestShuffling: def test_shuffle(self): original_dataset = list_to_dataset(["one", "two", "three"]) expected_dataset = list_to_dataset(["one", "two", "three"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) - assert_eq(expected_dataset.df, result_dataset.df) + all_equal(expected_dataset, result_dataset) def test_new_partitions(self): original_dataset = list_to_dataset(["one", "two", "three"], npartitions=3) - expected_dataset = list_to_dataset(["one", "two", "three"]) + expected_dataset = list_to_dataset(["one", "two", "three"]).compute() shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - assert_eq(expected_dataset.df, result_dataset.df) + all_equal(expected_dataset, result_dataset) def test_filename(self): original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) @@ -45,7 +49,7 @@ def test_filename(self): shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - assert_eq(expected_dataset.df, result_dataset.df) + all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) @@ -63,4 +67,4 @@ def filename_fn(x): shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) result_dataset = shuffle(original_dataset) - assert_eq(expected_dataset.df, result_dataset.df) + all_equal(expected_dataset, result_dataset) From 88ae8af2b54b373543b813885914e8ea009fab4f Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:26:10 -0700 Subject: [PATCH 16/35] Fix expected order after shuffle Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 4 +++- tests/test_shuffle.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 8097daf29..a6b181c7b 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -46,7 +46,9 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: ) dataset.df[rand_col] = rand_df[rand_col] - shuffled_df = dataset.df.shuffle(rand_col, npartitions=new_npartitions) + shuffled_df = dataset.df.shuffle( + rand_col, npartitions=new_npartitions, ignore_index=True + ) shuffled_df = shuffled_df.drop(columns=[rand_col]) if "filename" in shuffled_df.columns: diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 561642c3c..44b87ece8 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -20,14 +20,14 @@ def all_equal(left_dataset, right_dataset): class TestShuffling: def test_shuffle(self): original_dataset = list_to_dataset(["one", "two", "three"]) - expected_dataset = list_to_dataset(["one", "two", "three"]) + expected_dataset = list_to_dataset(["two", "three", "one"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) def test_new_partitions(self): original_dataset = list_to_dataset(["one", "two", "three"], npartitions=3) - expected_dataset = list_to_dataset(["one", "two", "three"]).compute() + expected_dataset = list_to_dataset(["two", "three", "one"], npartitions=3) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) From b0166721bb0ef7f0b500312a57e70c7fb66f1f10 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 15:32:05 -0700 Subject: [PATCH 17/35] Add more documents to shuffle test Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 44b87ece8..cd336c0d2 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -19,28 +19,36 @@ def all_equal(left_dataset, right_dataset): class TestShuffling: def test_shuffle(self): - original_dataset = list_to_dataset(["one", "two", "three"]) - expected_dataset = list_to_dataset(["two", "three", "one"]) + original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) + expected_dataset = list_to_dataset(["two", "three", "one", "four", "five"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) def test_new_partitions(self): - original_dataset = list_to_dataset(["one", "two", "three"], npartitions=3) - expected_dataset = list_to_dataset(["two", "three", "one"], npartitions=3) + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=3 + ) + expected_dataset = list_to_dataset( + ["two", "three", "one", "four", "five"], npartitions=3 + ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) def test_filename(self): - original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "two", "three"], + "text": ["one", "two", "three", "four", "five"], "filename": [ "file_0000000001.jsonl", "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000002.jsonl", "file_0000000002.jsonl", ], } @@ -52,12 +60,20 @@ def test_filename(self): all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): - original_dataset = list_to_dataset(["one", "two", "three"], npartitions=1) + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "two", "three"], - "filename": ["my_1.test", "my_1.test", "my_2.test"], + "text": ["one", "two", "three", "four", "five"], + "filename": [ + "my_1.test", + "my_1.test", + "my_1.test", + "my_2.test", + "my_2.test", + ], } pdf = pd.DataFrame(expected_data) expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) From 48bb22b169afe90a8ec7e4b13c2625e2e9b890dc Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 16:08:52 -0700 Subject: [PATCH 18/35] Add assert statement Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index cd336c0d2..a1e6985ce 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -23,7 +23,7 @@ def test_shuffle(self): expected_dataset = list_to_dataset(["two", "three", "one", "four", "five"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + assert all_equal(expected_dataset, result_dataset) def test_new_partitions(self): original_dataset = list_to_dataset( @@ -34,7 +34,7 @@ def test_new_partitions(self): ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + assert all_equal(expected_dataset, result_dataset) def test_filename(self): original_dataset = list_to_dataset( @@ -57,7 +57,7 @@ def test_filename(self): shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + assert all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): original_dataset = list_to_dataset( @@ -83,4 +83,4 @@ def filename_fn(x): shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + assert all_equal(expected_dataset, result_dataset) From cd9fdcf881bf260b5047694d5c102aa6fb394c7d Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 22 Apr 2024 16:47:45 -0700 Subject: [PATCH 19/35] Add within partition shuffle Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 16 ++++++++++------ tests/test_shuffle.py | 20 ++++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index a6b181c7b..3b5a19071 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -50,18 +50,22 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: rand_col, npartitions=new_npartitions, ignore_index=True ) shuffled_df = shuffled_df.drop(columns=[rand_col]) - - if "filename" in shuffled_df.columns: - shuffled_df = shuffled_df.map_partitions(self._overwrite_filename) + shuffled_df = shuffled_df.map_partitions(self._partition_shuffle) return DocumentDataset(shuffled_df) - def _overwrite_filename(self, partition, partition_info=None): + def _partition_shuffle(self, partition, partition_info=None): if partition_info is None: return partition - filename = self.partition_to_filename(partition_info["number"]) - partition["filename"] = filename + partition_num = partition_info["number"] + partition = partition.sample( + frac=1, random_state=self.seed + partition_num + ).reset_index(drop=True) + + if "filename" in partition: + filename = self.partition_to_filename(partition_num) + partition["filename"] = filename return partition diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index a1e6985ce..b31c8465b 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -1,6 +1,5 @@ import dask.dataframe as dd import pandas as pd -from dask.dataframe.utils import assert_eq import nemo_curator as nc from nemo_curator.datasets import DocumentDataset @@ -14,7 +13,16 @@ def list_to_dataset(documents, col_name="text", npartitions=2): def all_equal(left_dataset, right_dataset): - return all(left_dataset.df.compute() == right_dataset.df.compute()) + left_result = left_dataset.df.compute() + right_result = right_dataset.df.compute() + + l_cols = set(left_result.columns) + r_cols = set(right_result.columns) + assert l_cols == r_cols + for col in left_result.columns: + left = left_result[col] + right = right_result[col] + assert all(left == right), f"Mismatch in {col} column.\n{left}\n{right}\n" class TestShuffling: @@ -23,7 +31,7 @@ def test_shuffle(self): expected_dataset = list_to_dataset(["two", "three", "one", "four", "five"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) - assert all_equal(expected_dataset, result_dataset) + all_equal(expected_dataset, result_dataset) def test_new_partitions(self): original_dataset = list_to_dataset( @@ -34,7 +42,7 @@ def test_new_partitions(self): ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - assert all_equal(expected_dataset, result_dataset) + all_equal(expected_dataset, result_dataset) def test_filename(self): original_dataset = list_to_dataset( @@ -57,7 +65,7 @@ def test_filename(self): shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) - assert all_equal(expected_dataset, result_dataset) + all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): original_dataset = list_to_dataset( @@ -83,4 +91,4 @@ def filename_fn(x): shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) result_dataset = shuffle(original_dataset) - assert all_equal(expected_dataset, result_dataset) + all_equal(expected_dataset, result_dataset) From 491f8b61373cadc7aac729a70882db01f5e43f9f Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 23 Apr 2024 10:46:02 -0700 Subject: [PATCH 20/35] Refactor add rand column for shuffle Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 30 ++++++++++++++++++----------- tests/test_shuffle.py | 12 ++++++------ 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 3b5a19071..1b8c398c3 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -1,8 +1,8 @@ import math from typing import Any, Callable, List, Optional -import dask.array as da import dask.dataframe as dd +import numpy as np from nemo_curator.datasets.doc_dataset import DocumentDataset @@ -32,28 +32,36 @@ def __init__( self.seed = seed self.npartitions = npartitions self.partition_to_filename = partition_to_filename + self.rand_col = "_shuffle_rand" def __call__(self, dataset: DocumentDataset) -> DocumentDataset: - rand_col = "_shuffle_rand" new_npartitions = ( dataset.df.npartitions if self.npartitions is None else self.npartitions ) - rng = da.random.default_rng(seed=self.seed) - rand_array = rng.integers(0, new_npartitions, size=len(dataset.df)) - rand_df = dd.from_dask_array(rand_array, columns=[rand_col]).repartition( - npartitions=dataset.df.npartitions - ) - dataset.df[rand_col] = rand_df[rand_col] + rand_df = dataset.df.map_partitions(self._add_rand_col, new_npartitions) - shuffled_df = dataset.df.shuffle( - rand_col, npartitions=new_npartitions, ignore_index=True + shuffled_df = rand_df.shuffle( + self.rand_col, npartitions=new_npartitions, ignore_index=True ) - shuffled_df = shuffled_df.drop(columns=[rand_col]) + shuffled_df = shuffled_df.drop(columns=[self.rand_col]) shuffled_df = shuffled_df.map_partitions(self._partition_shuffle) return DocumentDataset(shuffled_df) + def _add_rand_col(self, partition, new_npartitions, partition_info=None): + if partition_info is None: + partition_info = { + "number": 0, + } + + np.random.seed(self.seed + partition_info["number"]) + partition[self.rand_col] = np.random.random_integers( + 0, new_npartitions, size=len(partition) + ) + + return partition + def _partition_shuffle(self, partition, partition_info=None): if partition_info is None: return partition diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index b31c8465b..7696e95f0 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -20,15 +20,15 @@ def all_equal(left_dataset, right_dataset): r_cols = set(right_result.columns) assert l_cols == r_cols for col in left_result.columns: - left = left_result[col] - right = right_result[col] + left = left_result[col].reset_index(drop=True) + right = right_result[col].reset_index(drop=True) assert all(left == right), f"Mismatch in {col} column.\n{left}\n{right}\n" class TestShuffling: def test_shuffle(self): original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) - expected_dataset = list_to_dataset(["two", "three", "one", "four", "five"]) + expected_dataset = list_to_dataset(["three", "one", "five", "two", "four"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) @@ -38,7 +38,7 @@ def test_new_partitions(self): ["one", "two", "three", "four", "five"], npartitions=3 ) expected_dataset = list_to_dataset( - ["two", "three", "one", "four", "five"], npartitions=3 + ["one", "five", "four", "two", "three"], npartitions=3 ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) @@ -51,7 +51,7 @@ def test_filename(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "two", "three", "four", "five"], + "text": ["one", "three", "four", "five", "two"], "filename": [ "file_0000000001.jsonl", "file_0000000001.jsonl", @@ -74,7 +74,7 @@ def test_custom_filenames(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "two", "three", "four", "five"], + "text": ["one", "three", "four", "five", "two"], "filename": [ "my_1.test", "my_1.test", From 89009a8f6c747154e81bd114a2fecccfdf2263e2 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 23 Apr 2024 10:48:51 -0700 Subject: [PATCH 21/35] Fix filename tests Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 7696e95f0..3c57d01bc 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -53,11 +53,11 @@ def test_filename(self): expected_data = { "text": ["one", "three", "four", "five", "two"], "filename": [ + "file_0000000000.jsonl", + "file_0000000000.jsonl", + "file_0000000000.jsonl", "file_0000000001.jsonl", "file_0000000001.jsonl", - "file_0000000001.jsonl", - "file_0000000002.jsonl", - "file_0000000002.jsonl", ], } pdf = pd.DataFrame(expected_data) @@ -76,11 +76,11 @@ def test_custom_filenames(self): expected_data = { "text": ["one", "three", "four", "five", "two"], "filename": [ + "my_0.test", + "my_0.test", + "my_0.test", "my_1.test", "my_1.test", - "my_1.test", - "my_2.test", - "my_2.test", ], } pdf = pd.DataFrame(expected_data) From aaa64934c54e11daacfdf2f1e94930acf27cc906 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 23 Apr 2024 16:56:00 -0700 Subject: [PATCH 22/35] Add determinism handling for shuffle Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 4 +++- tests/test_shuffle.py | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 1b8c398c3..b8165ba9a 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -21,7 +21,9 @@ def __init__( """ Randomly permutes the dataset. This will make the original "filename" column invalid, so if the column is present it will be overwritten. Args: - seed: The random seed that will be used to determine which file each datapoint goes to. + seed: The random seed that will be used to determine which partition (file) each datapoint goes to. + Even with the seed set, the shuffle is not guaranteed to be deterministic if done in parallel. + Intiailize the Dask client with a single worker and single thread in order to ensure determinism. npartitions: The output number of partitions to create in the dataset. If None, it will retain the same number of partitions as the original dataset. partition_to_filename: If the filename column is present, it will be overwritten. diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 3c57d01bc..855d55907 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -1,9 +1,14 @@ import dask.dataframe as dd import pandas as pd +from dask.distributed import Client import nemo_curator as nc from nemo_curator.datasets import DocumentDataset +# Single threaded Dask is the only way to guarantee shuffle determinism +# Docs: https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.shuffle.html +client = Client(n_workers=1, threads_per_worker=1) + def list_to_dataset(documents, col_name="text", npartitions=2): data = {col_name: documents} @@ -28,7 +33,7 @@ def all_equal(left_dataset, right_dataset): class TestShuffling: def test_shuffle(self): original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) - expected_dataset = list_to_dataset(["three", "one", "five", "two", "four"]) + expected_dataset = list_to_dataset(["three", "one", "four", "five", "two"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) @@ -38,7 +43,7 @@ def test_new_partitions(self): ["one", "two", "three", "four", "five"], npartitions=3 ) expected_dataset = list_to_dataset( - ["one", "five", "four", "two", "three"], npartitions=3 + ["one", "four", "three", "five", "two"], npartitions=3 ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) From b1e1a42636cf977b45041abc2d8173f4d6014367 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Wed, 24 Apr 2024 11:24:14 -0700 Subject: [PATCH 23/35] Change numpy random function Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index b8165ba9a..34ed63492 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -58,7 +58,7 @@ def _add_rand_col(self, partition, new_npartitions, partition_info=None): } np.random.seed(self.seed + partition_info["number"]) - partition[self.rand_col] = np.random.random_integers( + partition[self.rand_col] = np.random.randint( 0, new_npartitions, size=len(partition) ) From c5f7b3edb87f9b3270af60ec3b405ec55bfaf53e Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Wed, 24 Apr 2024 14:22:52 -0700 Subject: [PATCH 24/35] Fix tests with new random method Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 855d55907..50fdb568b 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -33,7 +33,7 @@ def all_equal(left_dataset, right_dataset): class TestShuffling: def test_shuffle(self): original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) - expected_dataset = list_to_dataset(["three", "one", "four", "five", "two"]) + expected_dataset = list_to_dataset(["four", "three", "two", "one", "five"]) shuffle = nc.Shuffle(seed=42) result_dataset = shuffle(original_dataset) all_equal(expected_dataset, result_dataset) @@ -43,7 +43,7 @@ def test_new_partitions(self): ["one", "two", "three", "four", "five"], npartitions=3 ) expected_dataset = list_to_dataset( - ["one", "four", "three", "five", "two"], npartitions=3 + ["four", "three", "two", "one", "five"], npartitions=3 ) shuffle = nc.Shuffle(seed=42, npartitions=2) result_dataset = shuffle(original_dataset) @@ -56,11 +56,11 @@ def test_filename(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "three", "four", "five", "two"], + "text": ["four", "three", "two", "one", "five"], "filename": [ - "file_0000000000.jsonl", - "file_0000000000.jsonl", - "file_0000000000.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", "file_0000000001.jsonl", "file_0000000001.jsonl", ], @@ -79,11 +79,11 @@ def test_custom_filenames(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["one", "three", "four", "five", "two"], + "text": ["four", "three", "two", "one", "five"], "filename": [ - "my_0.test", - "my_0.test", - "my_0.test", + "my_1.test", + "my_1.test", + "my_1.test", "my_1.test", "my_1.test", ], From 5dda670c2cf52af14d3400178def9ebb2f3936f5 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 26 Apr 2024 15:50:10 -0700 Subject: [PATCH 25/35] Remove length call from blending Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 34ed63492..8b2a4692c 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -110,8 +110,7 @@ def blend_datasets( blend_components = [] for dataset, num_documents in zip(datasets, num_documents_per_dataset): # Repeatedly sample from the dataset - num_epochs = math.ceil(num_documents / len(dataset)) - for _ in range(num_epochs): + while num_documents > 0: sample = dataset.df.head(n=num_documents, npartitions=-1, compute=False) blend_components.append(sample) num_documents -= len(sample) From 6cee1fcf74531a8cbba5ee21f25d35229b35dace Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 15:08:51 -0700 Subject: [PATCH 26/35] Improve scaling of blending function Signed-off-by: Ryan Wolf --- nemo_curator/modules/dataset_ops.py | 32 ++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 8b2a4692c..7b24e65ca 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -111,10 +111,40 @@ def blend_datasets( for dataset, num_documents in zip(datasets, num_documents_per_dataset): # Repeatedly sample from the dataset while num_documents > 0: - sample = dataset.df.head(n=num_documents, npartitions=-1, compute=False) + sample = _partition_head(dataset.df, num_documents) blend_components.append(sample) num_documents -= len(sample) blended_dataset = dd.concat(blend_components) return DocumentDataset(blended_dataset) + + +def _partition_head(ddf: dd.DataFrame, n: int) -> dd.DataFrame: + """ + Returns the first n rows in a dataframe while preserving the partitions. + Meant as a replacement for ddf.head(npartitions=-1, compute=False) as it + uses too much memory at large scales + + Args: + ddf: The dataframe to get the first rows from + n: The number of rows to get + """ + original_meta = ddf.dtypes.to_dict() + partition_lengths = ddf.map_partitions(len) + num_partitions = 0 + total_size = 0 + last_length = 0 + for length in partition_lengths: + total_size += length + num_partitions += 1 + last_length = length + if total_size >= n: + break + + delayed_df = ddf.to_delayed() + excess_elems = max(0, total_size - n) + delayed_df = delayed_df[:num_partitions] + delayed_df[-1] = delayed_df[-1].head(last_length - excess_elems) + + return dd.from_delayed(delayed_df, meta=original_meta) From 2a1c7cb2d3920e7fdc12a6521985b322a91afb74 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 16:14:49 -0700 Subject: [PATCH 27/35] Fix blend tests Signed-off-by: Ryan Wolf --- tests/test_blend_datasets.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/test_blend_datasets.py b/tests/test_blend_datasets.py index 110daced6..7c7f7e28b 100644 --- a/tests/test_blend_datasets.py +++ b/tests/test_blend_datasets.py @@ -1,6 +1,5 @@ import dask.dataframe as dd import pandas as pd -from dask.dataframe.utils import assert_eq import nemo_curator as nc from nemo_curator.datasets import DocumentDataset @@ -13,11 +12,24 @@ def list_to_dataset(documents, col_name="text", npartitions=2): return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) +def all_equal(left_dataset, right_dataset): + left_result = left_dataset.df.compute() + right_result = right_dataset.df.compute() + + l_cols = set(left_result.columns) + r_cols = set(right_result.columns) + assert l_cols == r_cols + for col in left_result.columns: + left = left_result[col].reset_index(drop=True) + right = right_result[col].reset_index(drop=True) + assert all(left == right), f"Mismatch in {col} column.\n{left}\n{right}\n" + + class TestBlending: def test_blend_as_original(self): first_dataset = list_to_dataset(["one", "two", "three"]) result_dataset = nc.blend_datasets(len(first_dataset), [first_dataset], [1.0]) - assert_eq(first_dataset.df, result_dataset.df) + all_equal(first_dataset, result_dataset) def test_equal_blend(self): first_dataset = list_to_dataset(["a", "a"]) From 7025d885b02045d9b2629c0425d50b19d38d9b43 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 18:49:29 -0700 Subject: [PATCH 28/35] Add blending script Signed-off-by: Ryan Wolf --- nemo_curator/scripts/blend_datasets.py | 131 +++++++++++++++++++++++++ setup.py | 1 + 2 files changed, 132 insertions(+) create mode 100644 nemo_curator/scripts/blend_datasets.py diff --git a/nemo_curator/scripts/blend_datasets.py b/nemo_curator/scripts/blend_datasets.py new file mode 100644 index 000000000..b22b0c719 --- /dev/null +++ b/nemo_curator/scripts/blend_datasets.py @@ -0,0 +1,131 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +import nemo_curator as nc +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk +from nemo_curator.utils.file_utils import expand_outdir_and_mkdir +from nemo_curator.utils.script_utils import add_distributed_args, attach_bool_arg + + +def main(args): + client = get_client(args, args.device) + + out_dir = expand_outdir_and_mkdir(args.output_data_dir) + + input_dirs = args.input_data_dirs.split(",") + weights = [float(weight) for weight in args.weights.split(",")] + + datasets = [ + DocumentDataset( + read_data(path, file_type=args.input_file_type, backend="pandas") + ) + for path in input_dirs + ] + + output_dataset = nc.blend_datasets(args.target_samples, datasets, weights) + + if args.shuffle: + shuffle = nc.Shuffle(seed=args.seed) + output_dataset = shuffle(output_dataset) + + write_to_disk(output_dataset.df, out_dir, output_type=args.output_file_type) + + client.close() + + +def attach_args( + parser=argparse.ArgumentParser( + """ +Blends a collection of datasets together based on certain weights. + +It takes as input a comma-separated list of dataset directories, the +corresponding weights that should be associated with each datatset, +and the target number of samples to aggregate from across all the datasets. +The file shards of the resulting dataset are not guaranteed to be even +or reflect the original dataset(s). + +A blend is created from these datasets and saved to the specified output directory. +Optionally, the user can choose to shuffle this dataset as well. + """, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) +): + parser.add_argument( + "--input-data-dirs", + type=str, + default=None, + help="Comma-separated list of directories consisting of dataset " + "files that are accessible to all nodes.", + ) + parser.add_argument( + "--weights", + type=str, + default=None, + help="Comma-separated list of floating-point weights corresponding " + "to each dataset passed in --input-data-dirs", + ) + parser.add_argument( + "--output-data-dir", + type=str, + default=None, + help="The output directory to where the blended dataset is" + "retained during filtering will be written. If this argument " + "is not specified, then the document scores from the " + "filter(s) will be written to the document meta data in place", + ) + parser.add_argument( + "--target-samples", + type=int, + default=10000, + help="The number of samples to be included in the output dataset." + " There may be more samples in order to accurately reflect the " + "weight balance, but there will never be less", + ) + attach_bool_arg( + parser, + "shuffle", + default=False, + help_str="Shuffles the dataset after blending", + ) + parser.add_argument( + "--seed", + type=int, + default=None, + help="If specified, the random seed used for shuffling.", + ) + parser.add_argument( + "--input-file-type", + type=str, + default="jsonl", + help="File type of the dataset to be read in. Supported file formats" + " include 'jsonl' (default), 'pickle', or 'parquet'.", + ) + parser.add_argument( + "--output-file-type", + type=str, + default="jsonl", + help="File type the dataset will be written to. Supported file formats" + " include 'jsonl' (default), 'pickle', or 'parquet'.", + ) + + parser = add_distributed_args(parser) + + return parser + + +def console_script(): + main(attach_args().parse_args()) diff --git a/setup.py b/setup.py index 8fc60e926..2d8d247db 100644 --- a/setup.py +++ b/setup.py @@ -107,6 +107,7 @@ "quality_classifier_multiple_models_inference=nemo_curator.distributed_data_classification.quality_classifier_multiple_models_inference:console_script", "quality_classifier_inference=nemo_curator.distributed_data_classification.quality_classifier_inference:console_script", "verify_results=nemo_curator.distributed_data_classification.verify_results:console_script", + "blend_datasets=nemo_curator.scripts.blend_datasets:console_script", ], }, ) From b7d3f500f7602fbb6cf4b2c935c42c33036d656f Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 19:03:39 -0700 Subject: [PATCH 29/35] Add additional file paths call Signed-off-by: Ryan Wolf --- nemo_curator/scripts/blend_datasets.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/nemo_curator/scripts/blend_datasets.py b/nemo_curator/scripts/blend_datasets.py index b22b0c719..4f0fc253a 100644 --- a/nemo_curator/scripts/blend_datasets.py +++ b/nemo_curator/scripts/blend_datasets.py @@ -17,7 +17,10 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk -from nemo_curator.utils.file_utils import expand_outdir_and_mkdir +from nemo_curator.utils.file_utils import ( + expand_outdir_and_mkdir, + get_all_files_paths_under, +) from nemo_curator.utils.script_utils import add_distributed_args, attach_bool_arg @@ -31,7 +34,11 @@ def main(args): datasets = [ DocumentDataset( - read_data(path, file_type=args.input_file_type, backend="pandas") + read_data( + get_all_files_paths_under(path), + file_type=args.input_file_type, + backend="pandas", + ) ) for path in input_dirs ] From e7c2a9cc359465e26dd3a6c096d6e7e22d3ea5b1 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 22:38:59 -0700 Subject: [PATCH 30/35] Add documentation Signed-off-by: Ryan Wolf --- docs/user-guide/DocumentDataset.rst | 80 +++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/docs/user-guide/DocumentDataset.rst b/docs/user-guide/DocumentDataset.rst index 351e41a95..b6a328f3a 100644 --- a/docs/user-guide/DocumentDataset.rst +++ b/docs/user-guide/DocumentDataset.rst @@ -137,3 +137,83 @@ In these cases, we recommend processing the input dataset in batches using a sim This will read in 64 shards at a time, process them, and write them back to disk. Like ``get_remaining_files``, it only includes files that are in the input directory and not in the output directory. + +############################ +Blending and Shuffling +############################ + +Blending data from multiple sources can be a great way of improving downstream model performance. +This blending can be done during model training itself (i.e., *online* blending) or it can be done before training (i.e., *offline* blending). +Online blending is useful for rapidly iterating in the training process. +Meanwhile, offline blending is useful if you want to distribute the dataset. +Online blending is currently possible in NeMo, and NeMo Curator offers a way to perform blending offline. + +Let's take a look at how datasets can be combined using ``nc.blend_datasets`` + +.. code-block:: python + import nemo_curator as nc + + books = DocumentDataset.read_json("books_dataset/") + articles = DocumentDataset.read_json("articles_dataset/") + journals = DocumentDataset.read_json("journals_dataset/") + + datasets = [books, articles, journals] + target_samples = 1000 + weights = [5.0, 2.0, 1.0] + + blended_dataset = nc.blend_datasets(target_samples, datasets, weights) + + blended_dataset.to_json("blended_dataset/") + + +* ``datasets = [books, articles, journals]`` Here, we are choosing to blend three different datasets. + These datasets do not have to be in the same file format, or similar in size. + So long as they can be read in as a DocumentDataset, they will be fine. + The samples from each dataset are always drawn "in order". + The precise order depends on the format. + For sharded jsonl files, the entries at the beginning of the file with the first name in sorted order will be chosen first. +* ``target_samples = 1000`` This is the desired number of samples in the resulting dataset. + By sample, we mean document or just generally a single datapoint. + There may end up being more samples in the dataset depending on the weights. +* ``weights = [5.0, 2.0, 1.0]`` The relative number of samples that should be taken from each dataset. + Given these weights, the blended dataset will have five times as many samples from books as there are samples from journals. + Similarly, there will be two times as many samples from articles when compared to samples from journals. + Weights can be a list of non-negative real numbers. + ``nc.blend_datasets`` will do the normalization and combine the normalized weights with the target samples to determine + how many samples should be taken from each dataset. + In the case of the books dataset, this would be the calculation: + :math:`\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625` + If any datasets have fewer samples than the calculated weight, they will be oversampled to meet the quota. + For example, if the books dataset only had 500 documents in it, the first 125 would be repeated to achieve + the 625 samples. +* ``blended_dataset = nc.blend_datasets(target_samples, datasets, weights)`` We now call the function itself. + Afterwards, we are left with a blended dataset that we can operate on like any other dataset. + We can apply filters, deduplicate, or classify the documents. + +Because blending datasets involves combining data from multiple sources, the sharding of the original datasets +cannot be preserved. The options ``add_filename=True`` and ``write_to_filename=True`` for reading and writing +datasets are therefore incompatible with ``nc.blend_datasets``. + + +Shuffling can be another important aspect of dataset management. +NeMo Curator's ``nc.Shuffle`` allows users to reorder all entries in the dataset. + +Here is a small example on how this can be done: + +.. code-block:: python + import nemo_curator as nc + + books = DocumentDataset.read_json("books_dataset/") + + shuffle = nc.Shuffle(seed=42) + + shuffled_books = shuffle(books) + + shuffled_books.to_json("shuffled_books/") + +* ``shuffle = nc.Shuffle(seed=42)`` This creates a shuffle operation that can be chained with + the various other modules in NeMo Curator. In this example, we fix the seed to be 42. + Even with a fixed seed, shuffling is only guaranteed to be deterministic if done with a single-threaded + and single process client. Dask allows us to perform the shuffling in parallel for speed gains, so using a + multiprocessing client is recommended where determinism is optional. +* ``shuffled_books = shuffle(books)`` The dataset has now been shuffled, and we can save it to the filesystem. From 6ddff54fbe0a0d6029d1d2fc89cc02f63ffc1459 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 22:44:24 -0700 Subject: [PATCH 31/35] Reformat docs Signed-off-by: Ryan Wolf --- docs/user-guide/DocumentDataset.rst | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/user-guide/DocumentDataset.rst b/docs/user-guide/DocumentDataset.rst index b6a328f3a..c410a94f9 100644 --- a/docs/user-guide/DocumentDataset.rst +++ b/docs/user-guide/DocumentDataset.rst @@ -151,6 +151,7 @@ Online blending is currently possible in NeMo, and NeMo Curator offers a way to Let's take a look at how datasets can be combined using ``nc.blend_datasets`` .. code-block:: python + import nemo_curator as nc books = DocumentDataset.read_json("books_dataset/") @@ -181,8 +182,11 @@ Let's take a look at how datasets can be combined using ``nc.blend_datasets`` Weights can be a list of non-negative real numbers. ``nc.blend_datasets`` will do the normalization and combine the normalized weights with the target samples to determine how many samples should be taken from each dataset. - In the case of the books dataset, this would be the calculation: - :math:`\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625` + In the case of the books dataset, the following would be the calculation. + + .. math:: + + `\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625` If any datasets have fewer samples than the calculated weight, they will be oversampled to meet the quota. For example, if the books dataset only had 500 documents in it, the first 125 would be repeated to achieve the 625 samples. @@ -201,6 +205,7 @@ NeMo Curator's ``nc.Shuffle`` allows users to reorder all entries in the dataset Here is a small example on how this can be done: .. code-block:: python + import nemo_curator as nc books = DocumentDataset.read_json("books_dataset/") From ccc6e0ce65a2bc3587caf0f1476d7762aa435eb3 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Mon, 29 Apr 2024 22:45:10 -0700 Subject: [PATCH 32/35] Remove backticks Signed-off-by: Ryan Wolf --- docs/user-guide/DocumentDataset.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user-guide/DocumentDataset.rst b/docs/user-guide/DocumentDataset.rst index c410a94f9..ff5e4bae6 100644 --- a/docs/user-guide/DocumentDataset.rst +++ b/docs/user-guide/DocumentDataset.rst @@ -186,7 +186,7 @@ Let's take a look at how datasets can be combined using ``nc.blend_datasets`` .. math:: - `\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625` + \lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625 If any datasets have fewer samples than the calculated weight, they will be oversampled to meet the quota. For example, if the books dataset only had 500 documents in it, the first 125 would be repeated to achieve the 625 samples. From 53fcda93d101f834cca6753fd8e8143bf1b0921d Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 30 Apr 2024 14:51:01 -0700 Subject: [PATCH 33/35] Add context manager for shuffle tests Signed-off-by: Ryan Wolf --- tests/test_shuffle.py | 140 +++++++++++++++++++++++------------------- 1 file changed, 76 insertions(+), 64 deletions(-) diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 50fdb568b..64c01a2ba 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -1,14 +1,10 @@ import dask.dataframe as dd import pandas as pd -from dask.distributed import Client +from dask.distributed import Client, LocalCluster import nemo_curator as nc from nemo_curator.datasets import DocumentDataset -# Single threaded Dask is the only way to guarantee shuffle determinism -# Docs: https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.shuffle.html -client = Client(n_workers=1, threads_per_worker=1) - def list_to_dataset(documents, col_name="text", npartitions=2): data = {col_name: documents} @@ -32,68 +28,84 @@ def all_equal(left_dataset, right_dataset): class TestShuffling: def test_shuffle(self): - original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) - expected_dataset = list_to_dataset(["four", "three", "two", "one", "five"]) - shuffle = nc.Shuffle(seed=42) - result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + # Single threaded Dask is the only way to guarantee shuffle determinism + # Docs: https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.shuffle.html + with LocalCluster(n_workers=1, threads_per_worker=1) as cluster: + with Client(cluster): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"] + ) + expected_dataset = list_to_dataset( + ["four", "three", "two", "one", "five"] + ) + shuffle = nc.Shuffle(seed=42) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) def test_new_partitions(self): - original_dataset = list_to_dataset( - ["one", "two", "three", "four", "five"], npartitions=3 - ) - expected_dataset = list_to_dataset( - ["four", "three", "two", "one", "five"], npartitions=3 - ) - shuffle = nc.Shuffle(seed=42, npartitions=2) - result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + with LocalCluster(n_workers=1, threads_per_worker=1) as cluster: + with Client(cluster): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=3 + ) + expected_dataset = list_to_dataset( + ["four", "three", "two", "one", "five"], npartitions=3 + ) + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) def test_filename(self): - original_dataset = list_to_dataset( - ["one", "two", "three", "four", "five"], npartitions=1 - ) - original_dataset.df["filename"] = "original.jsonl" - - expected_data = { - "text": ["four", "three", "two", "one", "five"], - "filename": [ - "file_0000000001.jsonl", - "file_0000000001.jsonl", - "file_0000000001.jsonl", - "file_0000000001.jsonl", - "file_0000000001.jsonl", - ], - } - pdf = pd.DataFrame(expected_data) - expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) - - shuffle = nc.Shuffle(seed=42, npartitions=2) - result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + with LocalCluster(n_workers=1, threads_per_worker=1) as cluster: + with Client(cluster): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["four", "three", "two", "one", "five"], + "filename": [ + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + ], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): - original_dataset = list_to_dataset( - ["one", "two", "three", "four", "five"], npartitions=1 - ) - original_dataset.df["filename"] = "original.jsonl" - - expected_data = { - "text": ["four", "three", "two", "one", "five"], - "filename": [ - "my_1.test", - "my_1.test", - "my_1.test", - "my_1.test", - "my_1.test", - ], - } - pdf = pd.DataFrame(expected_data) - expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) - - def filename_fn(x): - return f"my_{x}.test" - - shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) - result_dataset = shuffle(original_dataset) - all_equal(expected_dataset, result_dataset) + with LocalCluster(n_workers=1, threads_per_worker=1) as cluster: + with Client(cluster): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["four", "three", "two", "one", "five"], + "filename": [ + "my_1.test", + "my_1.test", + "my_1.test", + "my_1.test", + "my_1.test", + ], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + def filename_fn(x): + return f"my_{x}.test" + + shuffle = nc.Shuffle( + seed=42, npartitions=2, partition_to_filename=filename_fn + ) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) From 2c578cbd6623fda1097ed6c46c09f289b68c791a Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 3 May 2024 13:47:28 -0700 Subject: [PATCH 34/35] Add better deterministic shuffle path Signed-off-by: Ryan Wolf --- docs/user-guide/DocumentDataset.rst | 2 +- nemo_curator/modules/dataset_ops.py | 54 +++++++++++--- tests/test_shuffle.py | 105 ++++++++++++++++++++++++---- 3 files changed, 134 insertions(+), 27 deletions(-) diff --git a/docs/user-guide/DocumentDataset.rst b/docs/user-guide/DocumentDataset.rst index ff5e4bae6..386231db3 100644 --- a/docs/user-guide/DocumentDataset.rst +++ b/docs/user-guide/DocumentDataset.rst @@ -146,7 +146,7 @@ Blending data from multiple sources can be a great way of improving downstream m This blending can be done during model training itself (i.e., *online* blending) or it can be done before training (i.e., *offline* blending). Online blending is useful for rapidly iterating in the training process. Meanwhile, offline blending is useful if you want to distribute the dataset. -Online blending is currently possible in NeMo, and NeMo Curator offers a way to perform blending offline. +Online blending is currently possible in `NeMo via NVIDIA Megatron Core `_, and NeMo Curator offers a way to perform blending offline. Let's take a look at how datasets can be combined using ``nc.blend_datasets`` diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 7b24e65ca..87610db07 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -37,13 +37,33 @@ def __init__( self.rand_col = "_shuffle_rand" def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + if self.seed is None: + return self.shuffle_nondeterministic(dataset) + else: + return self.shuffle_deterministic(dataset) + + def shuffle_deterministic(self, dataset: DocumentDataset) -> DocumentDataset: + new_npartitions = ( + dataset.df.npartitions if self.npartitions is None else self.npartitions + ) + + dataset.df[self.rand_col] = dataset.df.map_partitions(self._add_rand_col) + + shuffled_df = dataset.df.set_index(self.rand_col, npartitions=new_npartitions) + + if "filename" in shuffled_df: + shuffled_df["filename"] = shuffled_df.map_partitions(self._add_filename) + + return DocumentDataset(shuffled_df) + + def shuffle_nondeterministic(self, dataset: DocumentDataset) -> DocumentDataset: new_npartitions = ( dataset.df.npartitions if self.npartitions is None else self.npartitions ) - rand_df = dataset.df.map_partitions(self._add_rand_col, new_npartitions) + dataset.df[self.rand_col] = dataset.df.map_partitions(self._add_rand_col) - shuffled_df = rand_df.shuffle( + shuffled_df = dataset.df.shuffle( self.rand_col, npartitions=new_npartitions, ignore_index=True ) shuffled_df = shuffled_df.drop(columns=[self.rand_col]) @@ -51,27 +71,31 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return DocumentDataset(shuffled_df) - def _add_rand_col(self, partition, new_npartitions, partition_info=None): + def _add_rand_col(self, partition, partition_info=None): if partition_info is None: partition_info = { "number": 0, } - np.random.seed(self.seed + partition_info["number"]) - partition[self.rand_col] = np.random.randint( - 0, new_npartitions, size=len(partition) - ) + if self.seed is not None: + np.random.seed(self.seed + partition_info["number"]) + rand_col = np.random.randint(0, np.iinfo("int64").max, size=len(partition)) - return partition + return rand_col def _partition_shuffle(self, partition, partition_info=None): if partition_info is None: return partition partition_num = partition_info["number"] - partition = partition.sample( - frac=1, random_state=self.seed + partition_num - ).reset_index(drop=True) + if self.seed is not None: + random_state = self.seed + partition_num + else: + random_state = None + + partition = partition.sample(frac=1, random_state=random_state).reset_index( + drop=True + ) if "filename" in partition: filename = self.partition_to_filename(partition_num) @@ -79,6 +103,14 @@ def _partition_shuffle(self, partition, partition_info=None): return partition + def _add_filename(self, partition, partition_info=None): + if partition_info is None: + return ["filename"] * len(partition) + + filename = self.partition_to_filename(partition_info["number"]) + + return [filename for _ in range(len(partition))] + def blend_datasets( target_size: int, datasets: List[DocumentDataset], sampling_weights: List[float] diff --git a/tests/test_shuffle.py b/tests/test_shuffle.py index 64c01a2ba..a23d47906 100644 --- a/tests/test_shuffle.py +++ b/tests/test_shuffle.py @@ -26,7 +26,7 @@ def all_equal(left_dataset, right_dataset): assert all(left == right), f"Mismatch in {col} column.\n{left}\n{right}\n" -class TestShuffling: +class TestShuffleNondeterministic: def test_shuffle(self): # Single threaded Dask is the only way to guarantee shuffle determinism # Docs: https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.shuffle.html @@ -36,10 +36,10 @@ def test_shuffle(self): ["one", "two", "three", "four", "five"] ) expected_dataset = list_to_dataset( - ["four", "three", "two", "one", "five"] + ["two", "five", "three", "one", "four"] ) shuffle = nc.Shuffle(seed=42) - result_dataset = shuffle(original_dataset) + result_dataset = shuffle.shuffle_nondeterministic(original_dataset) all_equal(expected_dataset, result_dataset) def test_new_partitions(self): @@ -49,10 +49,10 @@ def test_new_partitions(self): ["one", "two", "three", "four", "five"], npartitions=3 ) expected_dataset = list_to_dataset( - ["four", "three", "two", "one", "five"], npartitions=3 + ["two", "five", "three", "one", "four"], npartitions=3 ) shuffle = nc.Shuffle(seed=42, npartitions=2) - result_dataset = shuffle(original_dataset) + result_dataset = shuffle.shuffle_nondeterministic(original_dataset) all_equal(expected_dataset, result_dataset) def test_filename(self): @@ -64,11 +64,11 @@ def test_filename(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["four", "three", "two", "one", "five"], + "text": ["one", "two", "three", "five", "four"], "filename": [ - "file_0000000001.jsonl", - "file_0000000001.jsonl", - "file_0000000001.jsonl", + "file_0000000000.jsonl", + "file_0000000000.jsonl", + "file_0000000000.jsonl", "file_0000000001.jsonl", "file_0000000001.jsonl", ], @@ -77,7 +77,7 @@ def test_filename(self): expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) shuffle = nc.Shuffle(seed=42, npartitions=2) - result_dataset = shuffle(original_dataset) + result_dataset = shuffle.shuffle_nondeterministic(original_dataset) all_equal(expected_dataset, result_dataset) def test_custom_filenames(self): @@ -89,11 +89,11 @@ def test_custom_filenames(self): original_dataset.df["filename"] = "original.jsonl" expected_data = { - "text": ["four", "three", "two", "one", "five"], + "text": ["one", "two", "three", "five", "four"], "filename": [ - "my_1.test", - "my_1.test", - "my_1.test", + "my_0.test", + "my_0.test", + "my_0.test", "my_1.test", "my_1.test", ], @@ -107,5 +107,80 @@ def filename_fn(x): shuffle = nc.Shuffle( seed=42, npartitions=2, partition_to_filename=filename_fn ) - result_dataset = shuffle(original_dataset) + result_dataset = shuffle.shuffle_nondeterministic(original_dataset) all_equal(expected_dataset, result_dataset) + + def test_shuffle_no_seed(self): + original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) + shuffle = nc.Shuffle() + result_dataset = shuffle(original_dataset) + assert len(result_dataset.df.compute()) == 5 + + +class TestShuffleDeterministic: + def test_shuffle(self): + original_dataset = list_to_dataset(["one", "two", "three", "four", "five"]) + expected_dataset = list_to_dataset(["five", "four", "three", "one", "two"]) + shuffle = nc.Shuffle(seed=42) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) + + def test_new_partitions(self): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=3 + ) + expected_dataset = list_to_dataset( + ["four", "three", "five", "one", "two"], npartitions=3 + ) + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) + + def test_filename(self): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["four", "five", "three", "one", "two"], + "filename": [ + "file_0000000000.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + "file_0000000001.jsonl", + ], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + shuffle = nc.Shuffle(seed=42, npartitions=2) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) + + def test_custom_filenames(self): + original_dataset = list_to_dataset( + ["one", "two", "three", "four", "five"], npartitions=1 + ) + original_dataset.df["filename"] = "original.jsonl" + + expected_data = { + "text": ["four", "five", "three", "one", "two"], + "filename": [ + "my_0.test", + "my_1.test", + "my_1.test", + "my_1.test", + "my_1.test", + ], + } + pdf = pd.DataFrame(expected_data) + expected_dataset = DocumentDataset(dd.from_pandas(pdf, npartitions=2)) + + def filename_fn(x): + return f"my_{x}.test" + + shuffle = nc.Shuffle(seed=42, npartitions=2, partition_to_filename=filename_fn) + result_dataset = shuffle(original_dataset) + all_equal(expected_dataset, result_dataset) From fa25b339dc123977e44194ac3f79be10afb62993 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 3 May 2024 15:18:41 -0700 Subject: [PATCH 35/35] Update documentation and reset index Signed-off-by: Ryan Wolf --- docs/user-guide/DocumentDataset.rst | 5 ++--- nemo_curator/modules/dataset_ops.py | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/user-guide/DocumentDataset.rst b/docs/user-guide/DocumentDataset.rst index 386231db3..0086314a9 100644 --- a/docs/user-guide/DocumentDataset.rst +++ b/docs/user-guide/DocumentDataset.rst @@ -218,7 +218,6 @@ Here is a small example on how this can be done: * ``shuffle = nc.Shuffle(seed=42)`` This creates a shuffle operation that can be chained with the various other modules in NeMo Curator. In this example, we fix the seed to be 42. - Even with a fixed seed, shuffling is only guaranteed to be deterministic if done with a single-threaded - and single process client. Dask allows us to perform the shuffling in parallel for speed gains, so using a - multiprocessing client is recommended where determinism is optional. + Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower) + depending on the dataset size. * ``shuffled_books = shuffle(books)`` The dataset has now been shuffled, and we can save it to the filesystem. diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index 87610db07..38589b1e9 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -22,8 +22,8 @@ def __init__( Randomly permutes the dataset. This will make the original "filename" column invalid, so if the column is present it will be overwritten. Args: seed: The random seed that will be used to determine which partition (file) each datapoint goes to. - Even with the seed set, the shuffle is not guaranteed to be deterministic if done in parallel. - Intiailize the Dask client with a single worker and single thread in order to ensure determinism. + Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower) + depending on the dataset size. npartitions: The output number of partitions to create in the dataset. If None, it will retain the same number of partitions as the original dataset. partition_to_filename: If the filename column is present, it will be overwritten. @@ -50,6 +50,7 @@ def shuffle_deterministic(self, dataset: DocumentDataset) -> DocumentDataset: dataset.df[self.rand_col] = dataset.df.map_partitions(self._add_rand_col) shuffled_df = dataset.df.set_index(self.rand_col, npartitions=new_npartitions) + shuffled_df = shuffled_df.reset_index(drop=True) if "filename" in shuffled_df: shuffled_df["filename"] = shuffled_df.map_partitions(self._add_filename)