diff --git a/docs/nanoset.md b/docs/nanoset.md index 02649bd0..9dce21b7 100644 --- a/docs/nanoset.md +++ b/docs/nanoset.md @@ -1,41 +1,42 @@ # Nanosets -Nanotron incorporates [`Nanosets`](../src/nanotron/data/nanoset.py), a kind of datasets based on [numpy memory-mapped arrays](https://numpy.org/doc/stable/reference/generated/numpy.memmap.html). `Nanosets` are capable of serving batches from files containing pre-tokenized datasets. They allow reading tokens from one or multiple datasets and even specifying the weight of each dataset when building batches. +Nanotron incorporates [`Nanosets`](../src/nanotron/data/nanoset.py), a dataset for processing tokenized documents with [`datatrove`](https://github.com/huggingface/datatrove). They allow reading tokens from one or multiple datasets and even specifying the weight of each dataset when building batches. ## Install To use `Nanosets`, it's necessary to install Nanotron with the `nanosets` flavor. ``` -pip install -e '.[nanosets]' +pip install nanotron[nanosets] ``` This will install the following dependencies: -- `transformers`: To tokenize the datasets -- `datasets`: To preprocess the datasets +- `datatrove`: To preprocess the datasets - `numba`: To compile helper functions in order to speed up the creation of `Nanosets` +- `transformers`: For the tokenizers ## Data pre-processing -To use these datasets, first, we need to preprocess the data. The input format can either be a column of a Hugging Face Dataset or a .json file containing a text sample per line. For example: +To use this dataset, first, we need to preprocess the data using `datatrove`'s `DocumentTokenizer` pipeline. We invite you to take a look at `datatrove`, since it contains multiple features that allow, for example, filter out documents based on specific rules/criteria, extract text content from raw formats or scheduling the preprocessing in a Slurm cluster. We have also added a simple script capable of tokenizing datasets. -
-{"src": "www.nvidia.com", "text": "The quick brown fox", "type": "Eng", "id": "0", "title": "First Part"} -{"src": "The Internet", "text": "jumps over the lazy dog", "type": "Eng", "id": "42", "title": "Second Part"} -- -The preprocessing is done using the [`tools/preprocess_data.py`](../tools/preprocess_data.py) script. Below we show an example for processing a corpus with the Llama2 tokenizer. +The preprocessing is done using the [`tools/preprocess_data.py`](../tools/preprocess_data.py) script. The input format can either be a Hugging Face Dataset, a path to a `.jsonl` or a path to a folder containing multiple `.jsonl` files. Below we show an example for processing a Hugging Face Dataset from the Hub with the Llama3 tokenizer.
-torchrun --nproc-per-node 16 tools/preprocess_data.py \ - --input HuggingFaceH4/testing_alpaca_small \ - --split train \ - --column completion \ - --output-prefix datasets/testing_alpaca_small \ - --tokenizer-name-or-path openai-community/gpt2 +python3 tools/preprocess_data.py \ + --tokenizer-name-or-path meta-llama/Meta-Llama-3-8B \ + --output-folder datasets/emotion \ + --n-tasks 16 \ + hf \ + --dataset dair-ai/emotion \-The preprocessing script has to be launched with `torchrun` in order to spawn `--nproc-per-node` workers that will preprocess the dataset concurrently. The `--input` dataset can be either a Hugging Face Dataset from the Hub or a `.json` file. The processed dataset will be stored in *`--output-prefix`_input_ids.npy*. In `--tokenizer-name-or-path`, we will have to specify a tokenizer in the same way as we do when using `AutoTokenizers.from_pretrained(...)`. +First with `--tokenizer-name-or-path` we will specify a tokenizer in the same way as we do when using `AutoTokenizers.from_pretrained(...)`. Then we specify the `--output-folder` where we will store the tokenized documents and the number of workers with `--n-tasks`. Finally we will indicate the type of dataset (whether if it's a Hugging Face Dataset ["**hf**"] or in jsonl ["**jsonl**"] format) and the dataset that we want to preprocess. Check the different settings with `python3 tools/preprocess_data.py --help`, `python3 tools/preprocess_data.py hf --help` & `python3 tools/preprocess_data.py jsonl --help`. -The output will be one file named, in this case, `datasets/testing_alpaca_small_input_ids.npy`. We will then have to specify this file in the `dataset_path` field in the config file. +Every worker will store in `--output-folder` 3 different kind of files: +- `*.ds` Containing the tokenized documents +- `*.ds.index` Containing the bounds of each tokenized document +- `*.ds.metadata` Containing the number of tokens and tokenizer used + +> [!IMPORTANT] +Remember to introduce the type of dataset to process. e.g. python3 tools/preprocess_data.py --tokenizer-name-or-path gpt2 --n-tasks 16 **jsonl** --dataset raw_datasets/c4-es-json-files ## Working with Nanosets To work with `Nanosets`, we just need to configure 1 argument: -1. `dataset_path`: This argument specifies the file or files that will compose the `Nanoset`. There are 3 ways to specify it: +1. `dataset_folder`: This argument specifies the file or files that will compose the `Nanoset`. There are 3 ways to specify it: 1. If we specify a single path, we will create a `Nanoset` from a single dataset file. ```yaml data_stages: @@ -43,7 +44,7 @@ To work with `Nanosets`, we just need to configure 1 argument: start_training_step: 1 data: dataset: - dataset_path: datasets/SlimPajama-6B_input_ids.npy + dataset_folder: datasets/SlimPajama-6B num_loading_workers: 0 seed: 1234 ``` @@ -54,9 +55,9 @@ To work with `Nanosets`, we just need to configure 1 argument: start_training_step: 15 data: dataset: - dataset_path: - - datasets/SlimPajama-6B_input_ids.npy - - datasets/testing_alpaca_small_input_ids.npy + dataset_folder: + - datasets/SlimPajama-6B + - datasets/testing_alpaca_small num_loading_workers: 0 seed: 1234 ``` @@ -67,9 +68,9 @@ To work with `Nanosets`, we just need to configure 1 argument: start_training_step: 25 data: dataset: - dataset_path: - datasets/SlimPajama-6B_input_ids.npy: 0.8 - datasets/testing_alpaca_small_input_ids.npy: 0.2 + dataset_folder: + datasets/SlimPajama-6B: 0.8 + datasets/testing_alpaca_small: 0.2 num_loading_workers: 0 seed: 1234 ``` @@ -82,7 +83,10 @@ torchrun --nproc-per-node 8 run_train.py --config configs/config_nanoset.yaml ``` ## Under the hood -`Nanosets` are responsible of building samples of `sequence length + 1` tokens from the preprocessed dataset files. The `dataset lengths` of each dataset will be determined by the `(dataset_number_of_tokens - 1) / sequence length`, discarding the last sample if its length < `sequence length`. +`Nanosets` are responsible of building samples of `sequence length + 1` tokens from the preprocessed dataset files. Despite most of the extracting logic lies in `DatatroveFolderDataset`, `Nanosets` will take care of the following: +1. Creating dataset mixtures from different dataset folder paths +2. Ensure that in each epoch, we consume each sample only once +3. Ensure that we never exhaust the `DataLoader` Based on the `dataset lengths`, the `dataset weights` and the `number of samples per epoch` (defined as the `sum(dataset lengths)`), we build the two indexes we need in order to extract samples from the `Nanoset` ([build_nanoset_index_helper](../src/nanotron/data/nanoset.py)): - `dataset index`: Contains the index of the dataset from the list of `dataset paths` from which to extract the sample, respecting the established dataset weight. diff --git a/examples/config_nanoset.yaml b/examples/config_nanoset.yaml index 31f23bf0..127ddb5e 100644 --- a/examples/config_nanoset.yaml +++ b/examples/config_nanoset.yaml @@ -7,25 +7,25 @@ checkpoints: data_stages: - data: dataset: - dataset_path: datasets/testing_alpaca_small_input_ids.npy + dataset_folder: datasets/c4-es/tokenized num_loading_workers: 1 seed: 42 name: General purpose training (Single dataset) start_training_step: 1 - data: dataset: - dataset_path: - - datasets/yelp_review_full_input_ids.npy - - datasets/testing_alpaca_small_input_ids.npy + dataset_folder: + - datasets/SlimPajama-6B/tokenized + - datasets/c4-es/tokenized num_loading_workers: 1 seed: 42 name: Second purpose training (> 1 dataset) start_training_step: 15 - data: dataset: - dataset_path: - datasets/testing_alpaca_small_input_ids.npy: 0.8 - datasets/yelp_review_full_input_ids.npy: 0.2 + dataset_folder: + datasets/SlimPajama-6B/tokenized: 0.8 + datasets/c4-es/tokenized: 0.2 num_loading_workers: 1 seed: 42 name: Third purpose training (Blended dataset) @@ -57,7 +57,7 @@ model: initializer_range: 0.02 intermediate_size: 64 is_llama_config: true - max_position_embeddings: 256 + max_position_embeddings: 1024 num_attention_heads: 4 num_hidden_layers: 2 num_key_value_heads: 4 @@ -67,7 +67,7 @@ model: rope_scaling: null tie_word_embeddings: true use_cache: true - vocab_size: 32000 + vocab_size: 50257 optimizer: accumulate_grad_in_fp32: true clip_grad: 1.0 @@ -88,11 +88,11 @@ optimizer: weight_decay: 0.01 zero_stage: 0 parallelism: - dp: 2 + dp: 1 expert_parallel_size: 1 pp: 1 pp_engine: 1f1b - tp: 2 + tp: 1 tp_linear_async_communication: true tp_mode: REDUCE_SCATTER profiler: null @@ -105,6 +105,6 @@ tokens: limit_test_batches: 0 limit_val_batches: 0 micro_batch_size: 2 - sequence_length: 128 + sequence_length: 1024 train_steps: 200 val_check_interval: -1 diff --git a/pyproject.toml b/pyproject.toml index e65f37a5..6a0cfb83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ fast-modeling = [ nanosets = [ "transformers", - "datasets", + "datatrove[io,processing]@git+https://github.com/huggingface/datatrove", "numba", ] diff --git a/run_train.py b/run_train.py index b33231f4..021d955d 100644 --- a/run_train.py +++ b/run_train.py @@ -143,17 +143,17 @@ def get_dataloader_from_data_stage( elif isinstance(data.dataset, NanosetDatasetsArgs): # Get tokenizer cardinality tokenizer = AutoTokenizer.from_pretrained(trainer.config.tokenizer.tokenizer_name_or_path) - token_dtype = np.int32 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else np.uint16 + token_size = 4 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else 2 del tokenizer # Create Nanoset from nanotron.data.nanoset import Nanoset with main_rank_first(trainer.parallel_context.world_pg): train_dataset = Nanoset( - dataset_paths=data.dataset.dataset_path, + dataset_folders=data.dataset.dataset_folder, dataset_weights=data.dataset.dataset_weights, sequence_length=trainer.sequence_length, - token_dtype=token_dtype, + token_size=token_size, train_split_num_samples=trainer.config.tokens.train_steps * trainer.global_batch_size, random_seed=data.seed, ) diff --git a/src/nanotron/config/config.py b/src/nanotron/config/config.py index 6f19de0a..05b49955 100644 --- a/src/nanotron/config/config.py +++ b/src/nanotron/config/config.py @@ -93,18 +93,18 @@ def __post_init__(self): @dataclass class NanosetDatasetsArgs: - dataset_path: Union[str, dict, List[str]] + dataset_folder: Union[str, dict, List[str]] def __post_init__(self): - if isinstance(self.dataset_path, str): # Case 1: 1 Dataset file - self.dataset_path = [self.dataset_path] + if isinstance(self.dataset_folder, str): # Case 1: 1 Dataset file + self.dataset_folder = [self.dataset_folder] self.dataset_weights = [1] - elif isinstance(self.dataset_path, List): # Case 2: > 1 Dataset file + elif isinstance(self.dataset_folder, List): # Case 2: > 1 Dataset file self.dataset_weights = None # Set to None so we consume all the samples randomly - elif isinstance(self.dataset_path, dict): # Case 3: dict with > 1 dataset_path and weights - tmp_dataset_path = self.dataset_path.copy() - self.dataset_path = list(tmp_dataset_path.keys()) - self.dataset_weights = list(tmp_dataset_path.values()) + elif isinstance(self.dataset_folder, dict): # Case 3: dict with > 1 dataset_folder and weights + tmp_dataset_folder = self.dataset_folder.copy() + self.dataset_folder = list(tmp_dataset_folder.keys()) + self.dataset_weights = list(tmp_dataset_folder.values()) @dataclass diff --git a/src/nanotron/data/collator.py b/src/nanotron/data/collator.py new file mode 100644 index 00000000..199527e1 --- /dev/null +++ b/src/nanotron/data/collator.py @@ -0,0 +1,80 @@ +import dataclasses +from typing import Dict, List, Union + +import numpy as np +import torch +from nanotron import distributed as dist +from nanotron.parallel.context import ParallelContext +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer + + +@dataclasses.dataclass +class NanosetDataCollatorForCLM: + """ + Data collator used for causal language modeling with Nanosets dataset. + + - input_pp_rank: Discards last input id token + - output_pp_rank: Discards first label id token + - other pp ranks: Don't have data. Instead, we use `TensorPointer` to point to the rank having the data. + """ + + sequence_length: int + input_pp_rank: int + output_pp_rank: int + parallel_context: ParallelContext + + def __call__(self, examples: List[Dict[str, List[np.ndarray]]]) -> Dict[str, Union[torch.Tensor, TensorPointer]]: + # Process the case when current rank doesn't require data. We return `TensorPointer` that points to ranks having the data. + current_pp_rank = dist.get_rank(self.parallel_context.pp_pg) + if current_pp_rank not in [ + self.input_pp_rank, + self.output_pp_rank, + ]: + assert all(len(example) == 0 for example in examples) + return { + "input_ids": TensorPointer(group_rank=self.input_pp_rank), + "input_mask": TensorPointer(group_rank=self.input_pp_rank), + "label_ids": TensorPointer(group_rank=self.output_pp_rank), + "label_mask": TensorPointer(group_rank=self.output_pp_rank), + } + + # Make sure we load only what's necessary, ie we only load a `input_ids` column. + assert all(list(example.keys()) == ["input_ids"] for example in examples) + + # TODO @nouamanetazi: Is it better to have examples as np.array or torch.Tensor? + input_ids = torch.vstack([examples[i]["input_ids"] for i in range(len(examples))]) # (b, s) + batch_size, expanded_input_length = input_ids.shape + + result: Dict[str, Union[torch.LongTensor, TensorPointer]] = {} + + result["input_ids"] = TensorPointer(group_rank=self.input_pp_rank) + result["input_mask"] = TensorPointer(group_rank=self.input_pp_rank) + result["label_ids"] = TensorPointer(group_rank=self.output_pp_rank) + result["label_mask"] = TensorPointer(group_rank=self.output_pp_rank) + + assert ( + expanded_input_length == self.sequence_length + 1 + ), f"Samples should be of length {self.sequence_length + 1} (seq_len+1), but got {expanded_input_length}" + + # Process inputs: last token is the label + if current_pp_rank == self.input_pp_rank: + result["input_ids"] = input_ids[:, :-1] + result["input_mask"] = torch.ones((batch_size, self.sequence_length), dtype=torch.bool) + + # Process labels: shift them to the left + if current_pp_rank == self.output_pp_rank: + result["label_ids"] = input_ids[:, 1:] + result["label_mask"] = torch.ones((batch_size, self.sequence_length), dtype=torch.bool) + + if isinstance(result["input_ids"], torch.Tensor) and result["input_ids"].shape[-1] != self.sequence_length: + raise ValueError( + f"`labels` are incorrectly preprocessed. `labels` length is {result['input_ids'].shape[-1]}, but should be" + f" {self.sequence_length}." + ) + if isinstance(result["label_ids"], torch.Tensor) and result["label_ids"].shape[-1] != self.sequence_length: + raise ValueError( + f"`labels` are incorrectly preprocessed. `labels` length is {result['label_ids'].shape[-1]}, but should be" + f" {self.sequence_length}." + ) + + return result diff --git a/src/nanotron/data/dataloader_builder.py b/src/nanotron/data/dataloader_builder.py index 4719c476..9d3285f6 100644 --- a/src/nanotron/data/dataloader_builder.py +++ b/src/nanotron/data/dataloader_builder.py @@ -1,7 +1,7 @@ import nanotron.distributed as dist from nanotron import logging +from nanotron.data.collator import NanosetDataCollatorForCLM from nanotron.dataloader import ( - DataCollatorForCLM, EmptyInfiniteDataset, get_dataloader_worker_init, get_sampler, @@ -32,7 +32,7 @@ def build_nanoset_dataloader( # No need to spawn a lot of workers, we can just use main dataloader_num_workers = 0 - data_collator = DataCollatorForCLM( + data_collator = NanosetDataCollatorForCLM( sequence_length=sequence_length, input_pp_rank=input_pp_rank, output_pp_rank=output_pp_rank, diff --git a/src/nanotron/data/nanoset.py b/src/nanotron/data/nanoset.py index 9d62b33d..90200967 100644 --- a/src/nanotron/data/nanoset.py +++ b/src/nanotron/data/nanoset.py @@ -1,7 +1,10 @@ +import os +import warnings from typing import Dict, List, Tuple, Union import numpy as np import torch +from datatrove.utils.dataset import DatatroveFolderDataset from nanotron import logging from nanotron.data.utils import count_dataset_indexes, normalize from nanotron.logging import log_rank @@ -15,49 +18,60 @@ class Nanoset(torch.utils.data.Dataset): The Nanoset dataset Args: - dataset_paths (List[str]): List of paths to tokenized datasets - dataset_weights (List[float]): List with the weights for weighted datasets. If None, consume all samples from all datasets without weighting. Weights are normalized in __init__ + dataset_folders (List[str]): List of folders with tokenized datasets + dataset_weights (Union[List[float], None]): List with the weights for weighted datasets. If None, consume all samples from all datasets without weighting. Weights are normalized in __init__ sequence_length (int): Sequence length of the built samples - token_dtype (Union[np.uint16, np.int32]): dtype of the tokens stored in the processed dataset files. np.uin16 for vocab sizes < 65535, np.int32 otherwise + token_size (int): Number of bytes for the tokens stored in the processed dataset files. 2 for vocab sizes < 65535, 4 otherwise train_split_num_samples (int): Number of samples the dataset needs. It's the training steps * global batch size """ def __init__( self, - dataset_paths: List[str], - dataset_weights: Union[List[float], None], + dataset_folders: List[str], sequence_length: int, - token_dtype: Union[np.uint16, np.int32], + token_size: int, train_split_num_samples: int, + dataset_weights: Union[List[float], None] = None, random_seed: int = 1234, ) -> None: + # Checks + if isinstance(dataset_folders, str): + warnings.warn("dataset_folders should be of type List[str] but str was provided. Converting to List[str]") + dataset_folders = [dataset_folders] + # Init - self.dataset_paths = dataset_paths - self.dataset_weights = dataset_weights + self.dataset_folders = dataset_folders self.sequence_length = sequence_length - self.token_dtype = token_dtype + self.token_size = token_size self.train_split_num_samples = train_split_num_samples self.random_seed = random_seed + self.datatrove_datasets = [] + for dataset_folder in self.dataset_folders: + self.datatrove_datasets.append( + DatatroveFolderDataset( + folder_path=dataset_folder, + filename_pattern=os.path.join(dataset_folder, "*.ds"), + seq_len=sequence_length, + recursive=False, + token_size=token_size, + shuffle=True, + ) + ) # Build Nanoset Index ## To build the index we need the length of each dataset - self.dataset_lengths = [] - for dataset_path in self.dataset_paths: - self.dataset_buffer_mmap = np.memmap(dataset_path, mode="r", order="C", dtype=self.token_dtype) - self.dataset_buffer = memoryview(self.dataset_buffer_mmap) - dataset_number_of_tokens = int(len(self.dataset_buffer)) - number_of_samples = int( - (dataset_number_of_tokens - 1) / sequence_length - ) # Discard last sample if length < sequence_length - self.dataset_lengths.append(number_of_samples) + self.dataset_lengths = [len(datatrove_dataset) for datatrove_dataset in self.datatrove_datasets] ## Set dataset weights if ( - self.dataset_weights is None + dataset_weights is None ): # Case of training with > 1 datasets without weighting them: Consume both datasets entirely on each epoch self.dataset_weights = normalize(self.dataset_lengths) else: self.dataset_weights = normalize(dataset_weights) + assert len(dataset_folders) == len( + self.dataset_weights + ), f"Specified {len(self.dataset_weights)} weights but {len(dataset_folders)} datasets were provided." ## Build dataset index and dataset sample index self.dataset_index, self.dataset_sample_index = self.build_nanoset_index() @@ -79,25 +93,12 @@ def __getitem__(self, idx: int) -> Dict[str, np.ndarray]: idx (int): The index into the dataset Returns: - Dict[str, numpy.ndarray]: The input ids wrapped in a dictionary + Dict[str, torch.LongTensor]: The input ids wrapped in a dictionary """ - dataset = self.dataset_index[idx] dataset_sample = self.dataset_sample_index[idx] - # Rebuild the memmap in every access to free memory - # https://stackoverflow.com/a/61472122 - self.dataset_buffer_mmap = np.memmap(self.dataset_paths[dataset], mode="r", order="C", dtype=self.token_dtype) - self.dataset_buffer = memoryview(self.dataset_buffer_mmap) - - # uint16 -> 2 bytes per token, int32 -> 4 bytes per token - offset = dataset_sample * self.sequence_length * (np.iinfo(self.token_dtype).bits / 8) - input_ids_tokens = np.frombuffer( - self.dataset_buffer, dtype=self.token_dtype, count=(self.sequence_length + 1), offset=int(offset) - ) - - # Return tokens as np.int32 as Torch can't handle uint16 - return {"input_ids": input_ids_tokens.astype(np.int32)} + return self.datatrove_datasets[dataset][dataset_sample] def build_nanoset_index(self) -> np.ndarray: """ @@ -124,15 +125,6 @@ def build_nanoset_index(self) -> np.ndarray: return dataset_index, dataset_sample_index - def __del__(self) -> None: - """ - Clean up Nanoset - """ - - if hasattr(self, "dataset_buffer_mmap"): - self.dataset_buffer_mmap._mmap.close() - del self.dataset_buffer_mmap - def print_nanoset_info(self): log_rank(f"> Total number of samples: {len(self)}", logger=logger, level=logging.INFO, rank=0) @@ -141,10 +133,10 @@ def print_nanoset_info(self): ) # Print samples from each dataset + weight - dataset_sample_count = count_dataset_indexes(self.dataset_index, len(self.dataset_paths)) + dataset_sample_count = count_dataset_indexes(self.dataset_index, len(self.dataset_folders)) for index, sample_count in enumerate(dataset_sample_count): log_rank( - f"> Total number of samples from the {self.dataset_paths[index].rsplit('/', 1)[-1]} dataset: {sample_count} ({round(normalize(dataset_sample_count).tolist()[index], 2)})", + f"> Total number of samples from the {self.dataset_folders[index]} dataset: {sample_count} ({round(normalize(dataset_sample_count).tolist()[index], 2)})", logger=logger, level=logging.INFO, rank=0, diff --git a/tests/helpers/data.py b/tests/helpers/data.py index 33bb2480..72deb7f5 100644 --- a/tests/helpers/data.py +++ b/tests/helpers/data.py @@ -3,6 +3,7 @@ import json import os import sys +from argparse import Namespace from collections import OrderedDict from pathlib import Path @@ -10,8 +11,6 @@ package_path = Path(package.__file__).parent.parent.parent sys.path.append(str(package_path)) -from argparse import Namespace - import nanotron.distributed as dist import torch from nanotron.data.nanoset import Nanoset @@ -23,31 +22,34 @@ def create_dataset_paths(tmp_dir: str, quantity: int): - json_dataset_path = [os.path.join(tmp_dir, f"pytest_{i}") for i in range(quantity)] - mmap_dataset_path = [f"{path}_input_ids.npy" for path in json_dataset_path] + json_dataset_path = [os.path.join(tmp_dir, f"pytest_{i}.json") for i in range(quantity)] + datatrove_tokenized_dataset_paths = [os.path.join(tmp_dir, f"tokenized_documents_{i}") for i in range(quantity)] - return json_dataset_path, mmap_dataset_path + return json_dataset_path, datatrove_tokenized_dataset_paths def create_dummy_json_dataset(path_to_json: str, dummy_text: str, n_samples: int = 50000): - with open(path_to_json + ".json", "a") as json_file: + with open(path_to_json, "a") as json_file: for sample in range(n_samples): sample_dict = {"text": f"[{sample}] Hello! Im sample {sample}! And this is my dummy text: {dummy_text}"} json_file.write(json.dumps(sample_dict)) json_file.write("\n") -def preprocess_dummy_dataset(path_to_json: str, tokenizer: str): +def preprocess_dummy_dataset(json_dataset_path: str, datatrove_tokenized_dataset_path: str, tokenizer: str): # Create args for preprocessing args = Namespace( - input=path_to_json + ".json", + readers="jsonl", + dataset=json_dataset_path, column="text", - output_prefix=path_to_json, + glob_pattern=None, + output_folder=datatrove_tokenized_dataset_path, tokenizer_name_or_path=tokenizer, - add_special_tokens=False, + eos_token=None, + n_tasks=1, + logging_dir=None, ) - # tools/preprocess_data.py main main(args) @@ -122,7 +124,7 @@ def assert_nanoset_sync_across_all_ranks(nanoset: Nanoset, parallel_context: Par IDX_SAMPLE = 23 nanoset_identifiers = OrderedDict() - nanoset_identifiers["dataset_paths"] = nanoset.dataset_paths + nanoset_identifiers["dataset_folders"] = nanoset.dataset_folders nanoset_identifiers["dataset_weights"] = nanoset.dataset_weights.tolist() nanoset_identifiers["sequence_length"] = nanoset.sequence_length nanoset_identifiers["train_split_num_samples"] = nanoset.train_split_num_samples @@ -131,6 +133,7 @@ def assert_nanoset_sync_across_all_ranks(nanoset: Nanoset, parallel_context: Par nanoset_identifiers["input_ids"] = nanoset[IDX_SAMPLE]["input_ids"].tolist() nanoset_identifiers["dataset_index"] = nanoset.dataset_index.tolist() nanoset_identifiers["dataset_sample_index"] = nanoset.dataset_sample_index.tolist() + nanoset_identifiers["token_size"] = nanoset.token_size unique_description_hash = compute_hash(nanoset_identifiers) assert_tensor_synced_across_pg( diff --git a/tests/nanoset/test_build_nanoset_dataloader.py b/tests/nanoset/test_build_nanoset_dataloader.py index 2c3ff542..113c545c 100644 --- a/tests/nanoset/test_build_nanoset_dataloader.py +++ b/tests/nanoset/test_build_nanoset_dataloader.py @@ -1,6 +1,7 @@ import sys from math import isclose from pathlib import Path +from typing import List package_path = Path(__file__).parent.parent sys.path.append(str(package_path)) @@ -33,7 +34,7 @@ for all_3d_configs in get_all_3d_configurations(gpus) ], ) -@pytest.mark.parametrize("train_steps", [5, 100]) +@pytest.mark.parametrize("train_steps", [500, 10000]) @pytest.mark.parametrize("sequence_length", [512, 8192]) @pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"]) @rerun_if_address_is_in_use() @@ -42,16 +43,21 @@ def test_build_nanoset_dataloader( ): test_context = TestContext() - # Create dataset files - json_paths, mmap_dataset_paths = create_dataset_paths(tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2) + # Create dataset folders + json_paths, datatrove_tokenized_dataset_folders = create_dataset_paths( + tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2 + ) # Create dummy json datasets for idx, json_path in enumerate(json_paths): create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000) + # Preprocess json dataset with datatrove + for json_path, datatrove_tokenized_dataset_folder in zip(json_paths, datatrove_tokenized_dataset_folders): + preprocess_dummy_dataset(json_path, datatrove_tokenized_dataset_folder, tokenizer_name_or_path) + init_distributed(tp=tp, dp=dp, pp=pp)(_test_build_nanoset_dataloader)( - json_paths=json_paths, - path_to_mmap_files=mmap_dataset_paths, + datatrove_tokenized_dataset_folders=datatrove_tokenized_dataset_folders, train_steps=train_steps, sequence_length=sequence_length, tokenizer_name_or_path=tokenizer_name_or_path, @@ -60,8 +66,7 @@ def test_build_nanoset_dataloader( def _test_build_nanoset_dataloader( parallel_context: ParallelContext, - json_paths: str, - path_to_mmap_files: str, + datatrove_tokenized_dataset_folders: List[str], train_steps: int, sequence_length: int, tokenizer_name_or_path: str, @@ -71,41 +76,37 @@ def _test_build_nanoset_dataloader( N_MICRO_BATCHES_PER_BATCH = 8 GLOBAL_BATCH_SIZE = MICRO_BATCH_SIZE * N_MICRO_BATCHES_PER_BATCH * parallel_context.dp_pg.size() - # Preprocess dummy json datasets - for json_path in json_paths: - preprocess_dummy_dataset(path_to_json=json_path, tokenizer=tokenizer_name_or_path) - input_pp_rank, output_pp_rank = 0, int(parallel_context.pp_pg.size() - 1) # Get tokenizer cardinality tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path) - token_dtype = np.int32 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else np.uint16 + token_size = 4 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else 2 del tokenizer # Create Nanoset configs: 1. Normal 2. Blended 3. Blended with weights nanoset_config = { - "dataset_paths": [path_to_mmap_files[0]], + "dataset_folders": [datatrove_tokenized_dataset_folders[0]], "dataset_weights": [1], "sequence_length": sequence_length, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE, "random_seed": SEED, } blended_nanoset_config = { - "dataset_paths": [path_to_mmap_files[0], path_to_mmap_files[1]], + "dataset_folders": datatrove_tokenized_dataset_folders, "dataset_weights": None, "sequence_length": sequence_length, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE, "random_seed": SEED, } blended_weighted_nanoset_config = { - "dataset_paths": [path_to_mmap_files[0], path_to_mmap_files[1]], + "dataset_folders": datatrove_tokenized_dataset_folders, "dataset_weights": [8, 2], "sequence_length": sequence_length, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE, "random_seed": SEED, } @@ -119,7 +120,7 @@ def _test_build_nanoset_dataloader( # Assert we have the same Nanoset in all ranks assert_nanoset_sync_across_all_ranks(train_dataset, parallel_context) - dataset_sample_count = count_dataset_indexes(train_dataset.dataset_index, len(train_dataset.dataset_paths)) + dataset_sample_count = count_dataset_indexes(train_dataset.dataset_index, len(train_dataset.dataset_folders)) for idx, ds_length in enumerate(train_dataset.dataset_lengths): # Assert Nanoset doesn't sample indexes greater than the datasets assert ( @@ -129,7 +130,7 @@ def _test_build_nanoset_dataloader( # Assert Nanoset builds up the correct blend WRT the dataset_weights assert isclose( normalize(dataset_sample_count).tolist()[idx], train_dataset.dataset_weights[idx], abs_tol=0.05 - ), f"Requested Nanoset to contain {round(train_dataset.dataset_weights[idx]*100, 2)}% of samples from {train_dataset.dataset_paths[idx]} but got {round(normalize(dataset_sample_count).tolist()[idx]*100, 2)}%" + ), f"Requested Nanoset to contain {round(train_dataset.dataset_weights[idx]*100, 2)}% of samples from {train_dataset.dataset_folders[idx]} but got {round(normalize(dataset_sample_count).tolist()[idx]*100, 2)}%" # Create Dataloaders dataloader = build_nanoset_dataloader( train_dataset, @@ -162,22 +163,27 @@ def _test_build_nanoset_dataloader( for all_3d_configs in get_all_3d_configurations(gpus) ], ) -@pytest.mark.parametrize("skipped_batches", [20, 50]) +@pytest.mark.parametrize("skipped_batches", [20, 5555]) @pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"]) @rerun_if_address_is_in_use() def test_recover_nanoset_dataloader(tp: int, dp: int, pp: int, skipped_batches: int, tokenizer_name_or_path: str): test_context = TestContext() - # Create dataset files - json_paths, mmap_dataset_paths = create_dataset_paths(tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2) + # Create dataset folders + json_paths, datatrove_tokenized_dataset_folders = create_dataset_paths( + tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2 + ) # Create dummy json datasets for idx, json_path in enumerate(json_paths): create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000) + # Preprocess json dataset with datatrove + for json_path, datatrove_tokenized_dataset_folder in zip(json_paths, datatrove_tokenized_dataset_folders): + preprocess_dummy_dataset(json_path, datatrove_tokenized_dataset_folder, tokenizer_name_or_path) + init_distributed(tp=tp, dp=dp, pp=pp)(_test_recover_nanoset_dataloader)( - json_paths=json_paths, - path_to_mmap_files=mmap_dataset_paths, + datatrove_tokenized_dataset_folders=datatrove_tokenized_dataset_folders, skipped_batches=skipped_batches, tokenizer_name_or_path=tokenizer_name_or_path, ) @@ -185,8 +191,7 @@ def test_recover_nanoset_dataloader(tp: int, dp: int, pp: int, skipped_batches: def _test_recover_nanoset_dataloader( parallel_context: ParallelContext, - json_paths: str, - path_to_mmap_files: str, + datatrove_tokenized_dataset_folders: List[str], skipped_batches: int, tokenizer_name_or_path: str, ): @@ -195,43 +200,39 @@ def _test_recover_nanoset_dataloader( N_MICRO_BATCHES_PER_BATCH = 8 GLOBAL_BATCH_SIZE = MICRO_BATCH_SIZE * N_MICRO_BATCHES_PER_BATCH * parallel_context.dp_pg.size() SEQUENCE_LENGTH = 1024 - TRAIN_STEPS = 100 - - # Preprocess dummy json datasets - for json_path in json_paths: - preprocess_dummy_dataset(path_to_json=json_path, tokenizer=tokenizer_name_or_path) + TRAIN_STEPS = 10000 input_pp_rank, output_pp_rank = 0, int(parallel_context.pp_pg.size() - 1) # Get tokenizer cardinality tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path) - token_dtype = np.int32 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else np.uint16 + token_size = 4 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else 2 del tokenizer # Create Nanoset configs: 1. Normal 2. Blended 3. Blended with weights nanoset_config = { - "dataset_paths": [path_to_mmap_files[0]], + "dataset_folders": [datatrove_tokenized_dataset_folders[0]], "dataset_weights": [1], "sequence_length": SEQUENCE_LENGTH, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE, "random_seed": SEED, } blended_nanoset_config = { - "dataset_paths": [path_to_mmap_files[0], path_to_mmap_files[1]], + "dataset_folders": datatrove_tokenized_dataset_folders, "dataset_weights": None, "sequence_length": SEQUENCE_LENGTH, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE, "random_seed": SEED, } blended_weighted_nanoset_config = { - "dataset_paths": [path_to_mmap_files[0], path_to_mmap_files[1]], + "dataset_folders": datatrove_tokenized_dataset_folders, "dataset_weights": [8, 2], "sequence_length": SEQUENCE_LENGTH, - "token_dtype": token_dtype, + "token_size": token_size, "train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE, "random_seed": SEED, } diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index 465d22f0..38db67f1 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -1,26 +1,21 @@ -import argparse -import os -import shutil -import sys +""" +To process HuggingFace Datasets: + python3 tools/preprocess_data.py --tokenizer-name-or-path meta-llama/Meta-Llama-3-8B --output-folder datasets/emotion --n-tasks 16 hf --dataset dair-ai/emotion +To process Jsonl files: + python3 tools/preprocess_data.py --tokenizer-name-or-path meta-llama/Meta-Llama-3-8B --output-folder datasets/c4-es --n-tasks 16 jsonl --dataset raw_datasets/c4-es-json-files +""" -import numpy as np -import torch.distributed as dist -from tqdm import tqdm -from transformers import AutoTokenizer +import argparse -from datasets import concatenate_datasets, load_dataset +from datatrove.executor.local import LocalPipelineExecutor +from datatrove.pipeline.readers import HuggingFaceDatasetReader, JsonlReader +from datatrove.pipeline.tokens import DocumentTokenizer def get_args(): parser = argparse.ArgumentParser() - group = parser.add_argument_group(title="input data") - group.add_argument( - "--input", type=str, required=True, help="Path to local stored dataset or repository on the Hugging Face hub" - ) - group.add_argument("--column", type=str, default="text", help="Column to preprocess from the Dataset") - parser.add_argument("--split", type=str, default="train", help="Which split of the data to process") - group = parser.add_argument_group(title="tokenizer") + group = parser.add_argument_group(title="Tokenizer") group.add_argument( "--tokenizer-name-or-path", type=str, @@ -28,13 +23,54 @@ def get_args(): help="A path to a directory containing vocabulary files required by the tokenizer or the model id of a predefined tokenizer hosted inside a model repo on the Hugging Face Hub.", ) group.add_argument( - "--add-special-tokens", - action="store_true", - help="Whether or not to add special tokens when encoding the sequences. This will be passed to the Tokenizer", + "--eos-token", + type=str, + default=None, + help="EOS token to add after each document. Default: None", + ) + + group = parser.add_argument_group(title="Output data") + group.add_argument( + "--output-folder", type=str, required=True, help="Path to the output folder to store the tokenized documents" + ) + group = parser.add_argument_group(title="Miscellaneous configs") + group.add_argument( + "--logging-dir", + type=str, + default=None, + help="Path to a folder for storing the logs of the preprocessing step. Default: None", + ) + group.add_argument( + "--n-tasks", type=int, default=8, help="Total number of tasks to run the preprocessing step. Default: 8" + ) + # Subparsers for processing either Hugging Face datasets or jsonl files + sp = parser.add_subparsers( + dest="readers", + required=True, + description="Type of dataset to process. It can be either a Hugging Face Dataset loaded with datasets.load_data ('hf') or a .jsonl dataset ('jsonl')", + ) + + p1 = sp.add_parser(name="hf") + p1.add_argument( + "--dataset", + type=str, + required=True, + help="Path to local stored dataset or repository on the Hugging Face hub that can be loaded with datasets.load_dataset", ) + p1.add_argument("--column", type=str, default="text", help="Column to preprocess from the Dataset. Default: text") + p1.add_argument("--split", type=str, default="train", help="Which split of the data to process. Default: train") - group = parser.add_argument_group(title="output data") - group.add_argument("--output-prefix", type=str, required=True, help="Path to the output processed dataset file") + p2 = sp.add_parser(name="jsonl") + p2.add_argument( + "--dataset", + type=str, + required=True, + help="Path to a .jsonl file or a folder containing multiple .jsonl files", + ) + p2.add_argument("--column", type=str, default="text", help="Column to preprocess from the Dataset. Default: text") + p2.add_argument( + "--glob-pattern", type=str, default=None, help="A glob pattern to filter files to read. Default: None" + ) args = parser.parse_args() @@ -42,74 +78,32 @@ def get_args(): def main(args): - - world_size, rank = int(os.environ["WORLD_SIZE"]), int(os.environ["RANK"]) - - # Remove stdout from all processes except main to not flood the stdout - if rank: - sys.stdout = open(os.devnull, "w") - - # Check if output directory exists - if not os.path.isdir(os.path.abspath(os.path.join(args.output_prefix, os.path.pardir))): - print(f"Creating {os.path.abspath(os.path.join(args.output_prefix, os.path.pardir))} directory...") - os.makedirs(os.path.abspath(os.path.join(args.output_prefix, os.path.pardir)), exist_ok=True) - - if args.input.endswith(".json"): # For processing JSON files (Cross compatibility with other projects) - ds = load_dataset("json", data_files=args.input) - ds = concatenate_datasets( - [ds[splits] for splits in ds.keys()] - ) # load_dataset returns DatasetDict and we want a Dataset + # Build datatrove reader + if args.readers == "hf": + datatrove_reader = HuggingFaceDatasetReader( + dataset=args.dataset, + text_key=args.column, + dataset_options={"split": args.split}, + ) else: - ds = load_dataset(args.input, split=args.split) - - ds = ds.shard(num_shards=world_size, index=rank, contiguous=True) - ds = ds.select_columns(args.column) - - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name_or_path) - token_dtype = np.int32 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else np.uint16 - - # Create tmp directory for worker outputs - tmp_folder = os.path.abspath(os.path.join(args.output_prefix, os.pardir, "tmp")) - os.makedirs(tmp_folder, exist_ok=True) - - print("Creating workers output files...") - worker_output_file = os.path.join(tmp_folder, f"worker_{rank}_input_ids.npy") - ds = ds.map( - lambda x: {"input_ids": tokenizer(x, add_special_tokens=args.add_special_tokens).input_ids}, - input_columns=args.column, - batched=True, - desc="Tokenizing Dataset", - remove_columns=[args.column], + datatrove_reader = JsonlReader(data_folder=args.dataset, text_key=args.column, glob_pattern=args.glob_pattern) + + preprocess_executor = LocalPipelineExecutor( + pipeline=[ + datatrove_reader, + DocumentTokenizer( + output_folder=args.output_folder, + tokenizer_name_or_path=args.tokenizer_name_or_path, + eos_token=args.eos_token, + max_tokens_per_file=1e9, + ), + ], + tasks=args.n_tasks, + logging_dir=args.logging_dir, ) - - worker_input_ids_file = open(worker_output_file, "wb") - for sample in ds: - np_array = np.array(sample["input_ids"], dtype=token_dtype) - worker_input_ids_file.write(np_array.tobytes(order="C")) - worker_input_ids_file.close() - - # Wait for all workers to process each shard of the Dataset - dist.barrier() - - # Only the main rank merges the worker files - if not rank: - output_file = f"{args.output_prefix}_input_ids.npy" - input_ids_file = open(output_file, "wb") - for worker_idx in tqdm(range(world_size), desc="Merging workers output files"): - worker_output_file = os.path.join(tmp_folder, f"worker_{worker_idx}_input_ids.npy") - with open(worker_output_file, "rb") as f: - shutil.copyfileobj(f, input_ids_file) - os.remove(worker_output_file) - - input_ids_file.close() - os.rmdir(tmp_folder) - print(f"Done! {args.input} processed dataset stored in {output_file}") - - else: # Close devnull stdout redirect - sys.stdout.close() + preprocess_executor.run() if __name__ == "__main__": _args = get_args() - dist.init_process_group(backend="gloo") main(_args)