From 079d46f181f366c355b63e6bfb4029a212d22b32 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Fri, 13 Dec 2024 10:40:42 -0800 Subject: [PATCH] Allow users to write to single file (#383) * jsonl support Signed-off-by: Sarah Yurick * update param name Signed-off-by: Sarah Yurick * run black Signed-off-by: Sarah Yurick * update pytest Signed-off-by: Sarah Yurick * add parquet Signed-off-by: Sarah Yurick * add npartitions check Signed-off-by: Sarah Yurick * add compute and repartition functions Signed-off-by: Sarah Yurick * add runtimeerror Signed-off-by: Sarah Yurick * remove compute function Signed-off-by: Sarah Yurick * address broken pytests Signed-off-by: Sarah Yurick * run black Signed-off-by: Sarah Yurick * add ayush's suggestions Signed-off-by: Sarah Yurick * run isort Signed-off-by: Sarah Yurick --------- Signed-off-by: Sarah Yurick Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> --- examples/classifiers/aegis_example.py | 2 +- examples/classifiers/domain_example.py | 2 +- examples/classifiers/fineweb_edu_example.py | 2 +- examples/classifiers/quality_example.py | 2 +- examples/translation_example.py | 2 +- nemo_curator/datasets/doc_dataset.py | 15 ++-- nemo_curator/sample_dataframe.py | 2 +- .../classifiers/aegis_classifier_inference.py | 2 +- .../domain_classifier_inference.py | 2 +- .../fineweb_edu_classifier_inference.py | 2 +- .../quality_classifier_inference.py | 2 +- nemo_curator/utils/distributed_utils.py | 80 ++++++++++++------- tests/test_io.py | 2 +- tests/test_separate_by_metadata.py | 2 +- .../distributed_data_classification.ipynb | 4 +- ...pretraining-vietnamese-data-curation.ipynb | 12 +-- 16 files changed, 80 insertions(+), 55 deletions(-) diff --git a/examples/classifiers/aegis_example.py b/examples/classifiers/aegis_example.py index 73650ce80..bfb111c37 100644 --- a/examples/classifiers/aegis_example.py +++ b/examples/classifiers/aegis_example.py @@ -44,7 +44,7 @@ def main(args): ) result_dataset = safety_classifier(dataset=input_dataset) - result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) + result_dataset.to_json(output_path=output_file_path, write_to_filename=True) global_et = time.time() print( diff --git a/examples/classifiers/domain_example.py b/examples/classifiers/domain_example.py index be3fc894d..20f67add8 100644 --- a/examples/classifiers/domain_example.py +++ b/examples/classifiers/domain_example.py @@ -39,7 +39,7 @@ def main(args): domain_classifier = DomainClassifier(filter_by=["Games", "Sports"]) result_dataset = domain_classifier(dataset=input_dataset) - result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) + result_dataset.to_json(output_path=output_file_path, write_to_filename=True) global_et = time.time() print( diff --git a/examples/classifiers/fineweb_edu_example.py b/examples/classifiers/fineweb_edu_example.py index d5bb374d8..f9473f171 100644 --- a/examples/classifiers/fineweb_edu_example.py +++ b/examples/classifiers/fineweb_edu_example.py @@ -38,7 +38,7 @@ def main(args): fineweb_edu_classifier = FineWebEduClassifier() result_dataset = fineweb_edu_classifier(dataset=input_dataset) - result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) + result_dataset.to_json(output_path=output_file_path, write_to_filename=True) global_et = time.time() print( diff --git a/examples/classifiers/quality_example.py b/examples/classifiers/quality_example.py index 6bf09c899..3f4a93e81 100644 --- a/examples/classifiers/quality_example.py +++ b/examples/classifiers/quality_example.py @@ -39,7 +39,7 @@ def main(args): quality_classifier = QualityClassifier(filter_by=["High", "Medium"]) result_dataset = quality_classifier(dataset=input_dataset) - result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) + result_dataset.to_json(output_path=output_file_path, write_to_filename=True) global_et = time.time() print( diff --git a/examples/translation_example.py b/examples/translation_example.py index d9d3be734..d16590166 100644 --- a/examples/translation_example.py +++ b/examples/translation_example.py @@ -368,7 +368,7 @@ def main(args): ) result_dataset = translator_model(dataset=input_dataset) - result_dataset.to_json(output_file_dir=args.output_data_dir, write_to_filename=True) + result_dataset.to_json(output_path=args.output_data_dir, write_to_filename=True) print(f"Total time taken for translation: {time.time()-st} seconds", flush=True) client.close() diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 482456a43..3bebbd7db 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -13,6 +13,7 @@ # limitations under the License. import os +from functools import wraps from typing import Any, List, Literal, Optional, Union import dask.dataframe as dd @@ -37,6 +38,10 @@ def __len__(self) -> int: def persist(self) -> "DocumentDataset": return DocumentDataset(self.df.persist()) + @wraps(dd.DataFrame.repartition) + def repartition(self, *args, **kwargs) -> "DocumentDataset": + return self.__class__(self.df.repartition(*args, **kwargs)) + def head(self, n: int = 5) -> Any: return self.df.head(n) @@ -146,7 +151,7 @@ def read_pickle( def to_json( self, - output_file_dir: str, + output_path: str, write_to_filename: bool = False, keep_filename_column: bool = False, ): @@ -156,7 +161,7 @@ def to_json( """ write_to_disk( df=self.df, - output_file_dir=output_file_dir, + output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, output_type="jsonl", @@ -164,7 +169,7 @@ def to_json( def to_parquet( self, - output_file_dir: str, + output_path: str, write_to_filename: bool = False, keep_filename_column: bool = False, ): @@ -174,7 +179,7 @@ def to_parquet( """ write_to_disk( df=self.df, - output_file_dir=output_file_dir, + output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, output_type="parquet", @@ -182,7 +187,7 @@ def to_parquet( def to_pickle( self, - output_file_dir: str, + output_path: str, write_to_filename: bool = False, ): raise NotImplementedError("DocumentDataset does not support to_pickle yet") diff --git a/nemo_curator/sample_dataframe.py b/nemo_curator/sample_dataframe.py index 15d5e83f1..59ff33545 100644 --- a/nemo_curator/sample_dataframe.py +++ b/nemo_curator/sample_dataframe.py @@ -76,7 +76,7 @@ def sample_dataframe(df, num_samples): sampled_df = sample_dataframe(df, num_samples=args.num_samples) write_to_disk( df=sampled_df, - output_file_dir=args.output_file_path, + output_path=args.output_file_path, write_to_filename=True, ) et = time.time() diff --git a/nemo_curator/scripts/classifiers/aegis_classifier_inference.py b/nemo_curator/scripts/classifiers/aegis_classifier_inference.py index bdff0b816..e6225d268 100644 --- a/nemo_curator/scripts/classifiers/aegis_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/aegis_classifier_inference.py @@ -86,7 +86,7 @@ def main(): write_to_disk( df=df, - output_file_dir=args.output_data_dir, + output_path=args.output_data_dir, write_to_filename=add_filename, output_type=args.output_file_type, ) diff --git a/nemo_curator/scripts/classifiers/domain_classifier_inference.py b/nemo_curator/scripts/classifiers/domain_classifier_inference.py index 8854f4fb0..94f3bab06 100644 --- a/nemo_curator/scripts/classifiers/domain_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/domain_classifier_inference.py @@ -86,7 +86,7 @@ def main(): write_to_disk( df=df, - output_file_dir=args.output_data_dir, + output_path=args.output_data_dir, write_to_filename=add_filename, output_type=args.output_file_type, ) diff --git a/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py b/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py index c131c0194..26d17e2e2 100644 --- a/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py @@ -87,7 +87,7 @@ def main(): write_to_disk( df=df, - output_file_dir=args.output_data_dir, + output_path=args.output_data_dir, write_to_filename=add_filename, output_type=args.output_file_type, ) diff --git a/nemo_curator/scripts/classifiers/quality_classifier_inference.py b/nemo_curator/scripts/classifiers/quality_classifier_inference.py index 475a0104c..ca5ee5415 100644 --- a/nemo_curator/scripts/classifiers/quality_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/quality_classifier_inference.py @@ -87,7 +87,7 @@ def main(): write_to_disk( df=df, - output_file_dir=args.output_data_dir, + output_path=args.output_data_dir, write_to_filename=add_filename, output_type=args.output_file_type, ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 27d41679c..75931b95c 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -641,7 +641,7 @@ def _single_partition_write_to_simple_bitext( def _merge_tmp_simple_bitext_partitions(tmp_output_dir: str, output_dir: str): - """Merge partitions of simple bitext files in `tmp_output_dir` into files at `output_file_dir`. + """Merge partitions of simple bitext files in `tmp_output_dir` into files at `output_dir`. Args: tmp_output_dir (str): temporary directory that has all the simple bitext output partitions, @@ -675,7 +675,7 @@ def _merge_tmp_simple_bitext_partitions(tmp_output_dir: str, output_dir: str): def write_to_disk( df, - output_file_dir: str, + output_path: str, write_to_filename: bool = False, keep_filename_column: bool = False, output_type: str = "jsonl", @@ -687,15 +687,30 @@ def write_to_disk( Args: df: A Dask DataFrame. - output_file_dir: The output file path. + output_path: The output file path. write_to_filename: Boolean representing whether to write the filename using the "filename" column. keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists. output_type: The type of output file to write. Can be "jsonl" or "parquet". """ - if write_to_filename and "filename" not in df.columns: + + # output_path is a file name + if isinstance(output_path, str) and output_path.endswith(".jsonl"): + if df.npartitions == 1: + df.map_partitions( + _write_to_jsonl_or_parquet, output_path, output_type + ).compute() + return + else: + raise RuntimeError( + "Could not write multi-partition DataFrame to a single JSONL file. " + "Please specify a directory output path or repartition the DataFrame." + ) + + # output_path is a directory + elif write_to_filename and "filename" not in df.columns: raise ValueError( - "write_using_filename is True but no filename column found in df" + "write_using_filename is True but no filename column found in DataFrame" ) if is_cudf_type(df): @@ -705,11 +720,12 @@ def write_to_disk( else: output_meta = pd.Series([True], dtype="bool") + # output_path is a directory if write_to_filename and output_type != "bitext": - os.makedirs(output_file_dir, exist_ok=True) + os.makedirs(output_path, exist_ok=True) output = df.map_partitions( single_partition_write_with_filename, - output_file_dir, + output_path, keep_filename_column=keep_filename_column, output_type=output_type, meta=output_meta, @@ -717,30 +733,20 @@ def write_to_disk( ) output = output.compute() + # output_path is a directory else: - if output_type == "jsonl": - if is_cudf_type(df): - # See open issue here: https://github.com/rapidsai/cudf/issues/15211 - # df.to_json(output_file_dir, orient="records", lines=True, engine="cudf", force_ascii=False) - df.to_json( - output_file_dir, orient="records", lines=True, force_ascii=False - ) - else: - df.to_json( - output_file_dir, orient="records", lines=True, force_ascii=False - ) - elif output_type == "parquet": - df.to_parquet(output_file_dir, write_index=False) + if output_type == "jsonl" or output_type == "parquet": + _write_to_jsonl_or_parquet(df, output_path, output_type) elif output_type == "bitext": if write_to_filename: - os.makedirs(output_file_dir, exist_ok=True) - tmp_output_file_dir = os.path.join(output_file_dir, ".tmp") + os.makedirs(output_path, exist_ok=True) + tmp_output_file_dir = os.path.join(output_path, ".tmp") os.makedirs(tmp_output_file_dir, exist_ok=True) file_name = os.path.basename(list(df.filename.unique())[0]) else: - tmp_output_file_dir = os.path.join(output_file_dir, ".tmp") + tmp_output_file_dir = os.path.join(output_path, ".tmp") os.makedirs(tmp_output_file_dir, exist_ok=True) - file_name = os.path.basename(output_file_dir) + file_name = os.path.basename(output_path) output = df.map_partitions( _single_partition_write_to_simple_bitext, @@ -751,17 +757,31 @@ def write_to_disk( output = output.compute() _merge_tmp_simple_bitext_partitions( tmp_output_file_dir, - ( - output_file_dir - if write_to_filename - else os.path.dirname(output_file_dir) - ), + (output_path if write_to_filename else os.path.dirname(output_path)), ) shutil.rmtree(tmp_output_file_dir) else: raise ValueError(f"Unknown output type: {output_type}") - print(f"Writing to disk complete for {df.npartitions} partitions", flush=True) + print(f"Writing to disk complete for {df.npartitions} partition(s)", flush=True) + + +def _write_to_jsonl_or_parquet( + df, + output_path: str, + output_type: Literal["jsonl", "parquet"] = "jsonl", +): + if output_type == "jsonl": + if is_cudf_type(df): + # See open issue here: https://github.com/rapidsai/cudf/issues/15211 + # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) + df.to_json(output_path, orient="records", lines=True, force_ascii=False) + else: + df.to_json(output_path, orient="records", lines=True, force_ascii=False) + elif output_type == "parquet": + df.to_parquet(output_path, write_index=False) + else: + raise ValueError(f"Unknown output type: {output_type}") def load_object_on_worker(attr, load_object_function, load_object_kwargs): diff --git a/tests/test_io.py b/tests/test_io.py index b03f26f98..76fd6ade9 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -233,7 +233,7 @@ def test_multifile_multi_partition(self, tmp_path, file_ext, read_f): ddf["filename"] = ddf["filename"] + f".{file_ext}" write_to_disk( df=ddf, - output_file_dir=tmp_path / file_ext, + output_path=tmp_path / file_ext, write_to_filename=True, output_type=file_ext, ) diff --git a/tests/test_separate_by_metadata.py b/tests/test_separate_by_metadata.py index 68b57497d..3e01d7f04 100644 --- a/tests/test_separate_by_metadata.py +++ b/tests/test_separate_by_metadata.py @@ -30,7 +30,7 @@ def _write_data(num_files, file_ext): df = dd.concat(dfs) write_to_disk( df=df, - output_file_dir=str(out_path), + output_path=str(out_path), write_to_filename=True, output_type=file_ext, ) diff --git a/tutorials/distributed_data_classification/distributed_data_classification.ipynb b/tutorials/distributed_data_classification/distributed_data_classification.ipynb index 4b855ba89..e2cdd3e1a 100644 --- a/tutorials/distributed_data_classification/distributed_data_classification.ipynb +++ b/tutorials/distributed_data_classification/distributed_data_classification.ipynb @@ -144,7 +144,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -175,7 +175,7 @@ "%%time\n", "\n", "result_dataset = classifier(dataset=input_dataset)\n", - "result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=write_to_filename)" + "result_dataset.to_json(output_path=output_file_path, write_to_filename=write_to_filename)" ] }, { diff --git a/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb b/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb index dae11fa51..6181a8a74 100644 --- a/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb +++ b/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb @@ -632,7 +632,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -662,7 +662,7 @@ "id_dataset = add_id(dataset)\n", "\n", "# Save the dataset with added IDs to disk\n", - "write_to_disk(id_dataset.df, output_file_dir=added_id_output_path, write_to_filename=True, output_type=\"parquet\")" + "write_to_disk(id_dataset.df, output_path=added_id_output_path, write_to_filename=True, output_type=\"parquet\")" ] }, { @@ -848,7 +848,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -879,7 +879,7 @@ "]\n", "\n", "# Save the final deduplicated dataset\n", - "write_to_disk(result, output_file_dir=deduped_output_dir, write_to_filename=True, output_type=\"parquet\")" + "write_to_disk(result, output_path=deduped_output_dir, write_to_filename=True, output_type=\"parquet\")" ] }, { @@ -1196,7 +1196,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -1222,7 +1222,7 @@ "filtered_dataset = filter_pipeline(target_dataset)\n", "\n", "# Save the filtered dataset\n", - "write_to_disk(filtered_dataset.df, output_file_dir=CF_output_path, write_to_filename=True, output_type=\"parquet\")" + "write_to_disk(filtered_dataset.df, output_path=CF_output_path, write_to_filename=True, output_type=\"parquet\")" ] }, {