diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 796beabb2..482456a43 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional, Union +import os +from typing import Any, List, Literal, Optional, Union import dask.dataframe as dd @@ -29,26 +30,40 @@ class DocumentDataset: def __init__(self, dataset_df: dd.DataFrame): self.df = dataset_df - def __len__(self): + def __len__(self) -> int: return len(self.df) - def persist(self): + # `def persist(self) -> Self` requires Python 3.11 or higher + def persist(self) -> "DocumentDataset": return DocumentDataset(self.df.persist()) - def head(self, n=5): + def head(self, n: int = 5) -> Any: return self.df.head(n) @classmethod def read_json( cls, input_files: Union[str, List[str]], - backend: str = "pandas", + backend: Literal["pandas", "cudf"] = "pandas", files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, - ): + ) -> "DocumentDataset": + """ + Read JSONL or JSONL file(s). + + Args: + input_files: The path of the input file(s). + backend: The backend to use for reading the data. + files_per_partition: The number of files to read per partition. + add_filename: Whether to add a "filename" column to the DataFrame. + input_meta: A dictionary or a string formatted as a dictionary, which outlines + the field names and their respective data types within the JSONL input file. + columns: If not None, only these columns will be read from the file. + + """ return cls( _read_json_or_parquet( input_files=input_files, @@ -65,13 +80,25 @@ def read_json( @classmethod def read_parquet( cls, - input_files, - backend="pandas", - files_per_partition=1, - add_filename=False, + input_files: Union[str, List[str]], + backend: Literal["pandas", "cudf"] = "pandas", + files_per_partition: int = 1, + add_filename: bool = False, columns: Optional[List[str]] = None, **kwargs, - ): + ) -> "DocumentDataset": + """ + Read Parquet file(s). + + Args: + input_files: The path of the input file(s). + backend: The backend to use for reading the data. + files_per_partition: The number of files to read per partition. + add_filename: Whether to add a "filename" column to the DataFrame. + columns: If not None, only these columns will be read from the file. + There is a significant performance gain when specifying columns for Parquet files. + + """ return cls( _read_json_or_parquet( input_files=input_files, @@ -87,13 +114,24 @@ def read_parquet( @classmethod def read_pickle( cls, - input_files, - backend="pandas", - files_per_partition=1, - add_filename=False, + input_files: Union[str, List[str]], + backend: Literal["pandas", "cudf"] = "pandas", + files_per_partition: int = 1, + add_filename: bool = False, columns: Optional[List[str]] = None, **kwargs, - ): + ) -> "DocumentDataset": + """ + Read Pickle file(s). + + Args: + input_files: The path of the input file(s). + backend: The backend to use for reading the data. + files_per_partition: The number of files to read per partition. + add_filename: Whether to add a "filename" column to the DataFrame. + columns: If not None, only these columns will be read from the file. + + """ return cls( read_data( input_files=input_files, @@ -108,12 +146,12 @@ def read_pickle( def to_json( self, - output_file_dir, - write_to_filename=False, - keep_filename_column=False, + output_file_dir: str, + write_to_filename: bool = False, + keep_filename_column: bool = False, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters. + See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. """ write_to_disk( @@ -126,12 +164,12 @@ def to_json( def to_parquet( self, - output_file_dir, - write_to_filename=False, - keep_filename_column=False, + output_file_dir: str, + write_to_filename: bool = False, + keep_filename_column: bool = False, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters. + See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. """ write_to_disk( @@ -144,8 +182,8 @@ def to_parquet( def to_pickle( self, - output_file_dir, - write_to_filename=False, + output_file_dir: str, + write_to_filename: bool = False, ): raise NotImplementedError("DocumentDataset does not support to_pickle yet") @@ -190,7 +228,7 @@ def to_pandas(self): def _read_json_or_parquet( input_files: Union[str, List[str]], file_type: str, - backend: str, + backend: Literal["cudf", "pandas"], files_per_partition: int, add_filename: bool, input_meta: Union[str, dict] = None, @@ -217,8 +255,8 @@ def _read_json_or_parquet( file_ext = "." + file_type if isinstance(input_files, list): - # List of jsonl or parquet files - if all(f.endswith(file_ext) for f in input_files): + # List of files + if all(os.path.isfile(f) for f in input_files): raw_data = read_data( input_files, file_type=file_type, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 53d40ce01..4e695baf8 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -22,7 +22,7 @@ from contextlib import nullcontext from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Dict, List, Literal, Optional, Union import dask.dataframe as dd import numpy as np @@ -57,21 +57,23 @@ def _enable_spilling(): def start_dask_gpu_local_cluster( - nvlink_only=False, - protocol="tcp", - rmm_pool_size="1024M", - enable_spilling=True, - set_torch_to_use_rmm=True, - rmm_async=True, - rmm_maximum_pool_size=None, - rmm_managed_memory=False, - rmm_release_threshold=None, + nvlink_only: bool = False, + protocol: str = "tcp", + rmm_pool_size: Optional[Union[int, str]] = "1024M", + enable_spilling: bool = True, + set_torch_to_use_rmm: bool = True, + rmm_async: bool = True, + rmm_maximum_pool_size: Optional[Union[int, str]] = None, + rmm_managed_memory: bool = False, + rmm_release_threshold: Optional[Union[int, str]] = None, **cluster_kwargs, ) -> Client: """ This function sets up a Dask cluster across all the GPUs present on the machine. + See get_client function for parameters. + """ extra_kwargs = ( { @@ -111,12 +113,16 @@ def start_dask_gpu_local_cluster( def start_dask_cpu_local_cluster( - n_workers=os.cpu_count(), threads_per_worker=1, **cluster_kwargs + n_workers: Optional[int] = os.cpu_count(), + threads_per_worker: int = 1, + **cluster_kwargs, ) -> Client: """ This function sets up a Dask cluster across all the CPUs present on the machine. + See get_client function for parameters. + """ cluster = LocalCluster( n_workers=n_workers, @@ -262,10 +268,10 @@ def _set_torch_to_use_rmm(): def read_single_partition( - files, - backend="cudf", - filetype="jsonl", - add_filename=False, + files: List[str], + backend: Literal["cudf", "pandas"] = "cudf", + filetype: str = "jsonl", + add_filename: bool = False, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, @@ -353,7 +359,10 @@ def read_single_partition( def read_pandas_pickle( - file, add_filename=False, columns=None, **kwargs + file: str, + add_filename: bool = False, + columns: Optional[List[str]] = None, + **kwargs, ) -> pd.DataFrame: """ This function reads a pickle file with Pandas. @@ -361,6 +370,7 @@ def read_pandas_pickle( Args: file: The path to the pickle file to read. add_filename: Whether to add a "filename" column to the DataFrame. + columns: If not None, only these columns will be read from the file. Returns: A Pandas DataFrame. @@ -375,9 +385,9 @@ def read_pandas_pickle( def read_data( - input_files, + input_files: Union[str, List[str]], file_type: str = "pickle", - backend: str = "cudf", + backend: Literal["cudf", "pandas"] = "cudf", files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, @@ -406,6 +416,9 @@ def read_data( # Try using cuDF. If not availible will throw an error. test_obj = cudf.Series + if isinstance(input_files, str): + input_files = [input_files] + if file_type == "pickle": df = read_pandas_pickle( input_files[0], add_filename=add_filename, columns=columns, **kwargs @@ -415,15 +428,30 @@ def read_data( df = df.to_backend("cudf") elif file_type in ["json", "jsonl", "parquet"]: + assert len(input_files) > 0 + + input_extensions = {os.path.splitext(f)[-1] for f in input_files} + if len(input_extensions) != 1: + raise RuntimeError( + "All files being read must have the same file type. " + "Please check your input directory or list of files to ensure this. " + "To generate a list of files with a given file type in your directory, " + "please use the nemo_curator.utils.file_utils.get_all_files_paths_under " + "function with the `keep_extensions` parameter." + ) + print(f"Reading {len(input_files)} files", flush=True) input_files = sorted(input_files) + if files_per_partition > 1: input_files = [ input_files[i : i + files_per_partition] for i in range(0, len(input_files), files_per_partition) ] + else: input_files = [[file] for file in input_files] + return dd.from_map( read_single_partition, input_files, @@ -435,8 +463,10 @@ def read_data( columns=columns, **kwargs, ) + else: raise RuntimeError("Could not read data, please check file type") + return df @@ -496,9 +526,9 @@ def process_all_batches( def single_partition_write_with_filename( df, - output_file_dir, - keep_filename_column=False, - output_type="jsonl", + output_file_dir: str, + keep_filename_column: bool = False, + output_type: str = "jsonl", ): """ This function processes a DataFrame and writes it to disk @@ -506,8 +536,8 @@ def single_partition_write_with_filename( Args: df: A DataFrame. output_file_dir: The output file path. - keep_filename_column: Whether to keep or drop the "filename" column, if it exists. - output_type="jsonl": The type of output file to write. + keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists. + output_type: The type of output file to write. Can be "jsonl" or "parquet". Returns: If the DataFrame is non-empty, return a Series containing a single element, True. If the DataFrame is empty, return a Series containing a single element, False. @@ -575,10 +605,10 @@ def single_partition_write_with_filename( def write_to_disk( df, - output_file_dir, - write_to_filename=False, - keep_filename_column=False, - output_type="jsonl", + output_file_dir: str, + write_to_filename: bool = False, + keep_filename_column: bool = False, + output_type: str = "jsonl", ): """ This function writes a Dask DataFrame to the specified file path. @@ -588,9 +618,9 @@ def write_to_disk( Args: df: A Dask DataFrame. output_file_dir: The output file path. - write_to_filename: Whether to write the filename using the "filename" column. - keep_filename_column: Whether to keep or drop the "filename" column, if it exists. - output_type="jsonl": The type of output file to write. + write_to_filename: Boolean representing whether to write the filename using the "filename" column. + keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists. + output_type: The type of output file to write. Can be "jsonl" or "parquet". """ if write_to_filename and "filename" not in df.columns: @@ -665,7 +695,7 @@ def load_object_on_worker(attr, load_object_function, load_object_kwargs): return obj -def offload_object_on_worker(attr): +def offload_object_on_worker(attr: str): """ This function deletes an existing attribute from a Dask worker. @@ -702,7 +732,19 @@ def get_current_client(): return None -def performance_report_if(path=None, report_name="dask-profile.html"): +def performance_report_if( + path: Optional[str] = None, report_name: str = "dask-profile.html" +): + """ + Generates a performance report if a valid path is provided, or returns a + no-op context manager if not. + + Args: + path: The directory path where the performance report should be saved. + If None, no report is generated. + report_name: The name of the report file. + + """ if path is not None: return performance_report(os.path.join(path, report_name)) else: @@ -712,7 +754,10 @@ def performance_report_if(path=None, report_name="dask-profile.html"): def performance_report_if_with_ts_suffix( path: Optional[str] = None, report_name: str = "dask-profile" ): - """Suffixes the report_name with the timestamp""" + """ + Same as performance_report_if, except it suffixes the report_name with the timestamp. + + """ return performance_report_if( path=path, report_name=f"{report_name}-{datetime.now().strftime('%Y%m%d_%H%M%S')}.html", diff --git a/nemo_curator/utils/file_utils.py b/nemo_curator/utils/file_utils.py index 288828a70..e919c88bd 100644 --- a/nemo_curator/utils/file_utils.py +++ b/nemo_curator/utils/file_utils.py @@ -16,6 +16,7 @@ import json import os import pathlib +import warnings from functools import partial, reduce from typing import List, Optional, Union @@ -45,7 +46,42 @@ def expand_outdir_and_mkdir(outdir): return outdir -def get_all_files_paths_under(root, recurse_subdirectories=True, followlinks=False): +def filter_files_by_extension( + files_list: List[str], + keep_extensions: Union[str, List[str]], +) -> List[str]: + """ + Given a list of files, filter it to only include files matching given extension(s). + + Args: + files_list: List of files. + keep_extensions: A string (e.g., "json") or a list of strings (e.g., ["json", "parquet"]) + representing which file types to keep from files_list. + + """ + filtered_files = [] + + if isinstance(keep_extensions, str): + keep_extensions = [keep_extensions] + + file_extensions = [s if s.startswith(".") else "." + s for s in keep_extensions] + + for file in files_list: + if file.endswith(tuple(file_extensions)): + filtered_files.append(file) + + if len(files_list) != len(filtered_files): + warnings.warn(f"Skipped at least one file due to unmatched file extension(s).") + + return filtered_files + + +def get_all_files_paths_under( + root: str, + recurse_subdirectories: bool = True, + followlinks: bool = False, + keep_extensions: Optional[Union[str, List[str]]] = None, +) -> List[str]: """ This function returns a list of all the files under a specified directory. Args: @@ -54,6 +90,9 @@ def get_all_files_paths_under(root, recurse_subdirectories=True, followlinks=Fal Please note that this can be slow for large number of files. followlinks: Whether to follow symbolic links. + keep_extensions: A string or list of strings representing a file type + or multiple file types to include in the output, e.g., + "jsonl" or ["jsonl", "parquet"]. """ if recurse_subdirectories: file_ls = [ @@ -65,6 +104,10 @@ def get_all_files_paths_under(root, recurse_subdirectories=True, followlinks=Fal file_ls = [entry.path for entry in os.scandir(root)] file_ls.sort() + + if keep_extensions is not None: + file_ls = filter_files_by_extension(file_ls, keep_extensions) + return file_ls @@ -146,7 +189,10 @@ def _update_filetype(file_set, old_file_type, new_file_type): def get_batched_files( - input_file_path, output_file_path, input_file_type, batch_size=64 + input_file_path: str, + output_file_path: str, + input_file_type: str, + batch_size: int = 64, ): """ This function returns a batch of files that still remain to be processed. @@ -329,7 +375,7 @@ def separate_by_metadata( return delayed(reduce)(merge_counts, delayed_counts) -def parse_str_of_num_bytes(s, return_str=False): +def parse_str_of_num_bytes(s: str, return_str: bool = False) -> Union[str, int]: try: power = "kmg".find(s[-1].lower()) + 1 size = float(s[:-1]) * 1024**power @@ -342,7 +388,10 @@ def parse_str_of_num_bytes(s, return_str=False): def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=None): - """Worker function to write out the data to jsonl files""" + """ + Worker function to write out the data to jsonl files + + """ def _encode_text(document): return document.strip().encode("utf-8") @@ -375,7 +424,11 @@ def _name(start_index, npad, prefix, i): def reshard_jsonl( - input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix="" + input_dir: str, + output_dir: str, + output_file_size: str = "100M", + start_index: int = 0, + file_prefix: str = "", ): """ Reshards a directory of jsonl files to have a new (approximate) file size for each shard