From 3732663171103f8bd1388c18586dd41fd1f5d311 Mon Sep 17 00:00:00 2001 From: Hariharan Devarajan Date: Tue, 24 Sep 2024 14:14:30 -0700 Subject: [PATCH] For sample indexing we fix the uneven sampling (#226) * For sample indexing we fix the uneven sampling 1. Fix uneven sampling done for index based and iterative 2. Add a validation step to ensure we can validate that global indices are correctly shuffled and no indices are lost. 3. Make sure we do file and sample shuffling in reconfigure step. 4. Remove sample shuffling from dataloader Sampler code. 5. Added test case to support uneven file distributions #225 * Increase GOTCHA and DFTRACER LEVEL to reduce print * Increasing test case to improve testing * reduced reader threads asnot enough data * ensure the sampler do not goes past the file in the last rank. --- .github/workflows/ci.yml | 94 +++++++++----- .../data_loader/dali_data_loader.py | 31 ++--- .../data_loader/torch_data_loader.py | 17 +-- dlio_benchmark/utils/config.py | 116 ++++++++++++------ dlio_benchmark/utils/utility.py | 8 ++ pytest.ini | 2 + tests/dlio_benchmark_test.py | 56 ++++++--- 7 files changed, 201 insertions(+), 123 deletions(-) create mode 100644 pytest.ini diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 924e1d86..4c90663d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,35 +103,65 @@ jobs: - name: test_train run: | source ${VENV_PATH}/bin/activate - mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[png-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[png-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[npz-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[csv-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[png-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[npz-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[csv-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali] -v - mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow] -v - mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch] -v - mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali] -v - mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali] -v + mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[png-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[png-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[npz-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[csv-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-True] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-True] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-True] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-True] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-True] -v + + mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[png-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[png-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[npz-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[csv-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-False] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-False] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-False] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-False] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-False] -v rm -rf data - name: test_custom_storage_root_train run: | @@ -227,9 +257,9 @@ jobs: run: | source ${VENV_PATH}/bin/activate rm -rf output data checkpoints - mpirun -np 2 ${DLIO_EXEC} workload=resnet50_a100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1 - mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1 - mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1 ++workload.dataset.format=synthetic + mpirun -np 2 ${DLIO_EXEC} workload=resnet50_a100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1 + mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1 + mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1 ++workload.dataset.format=synthetic rm -rf data - name: test_cosmoflow run: | diff --git a/dlio_benchmark/data_loader/dali_data_loader.py b/dlio_benchmark/data_loader/dali_data_loader.py index caec4d4f..1c5f7b86 100644 --- a/dlio_benchmark/data_loader/dali_data_loader.py +++ b/dlio_benchmark/data_loader/dali_data_loader.py @@ -35,7 +35,7 @@ class DaliIndexDataset(object): def __init__(self, format_type, dataset_type, epoch, worker_index, - total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234): + total_num_workers, total_num_samples, samples_per_worker, batch_size): self.format_type = format_type self.dataset_type = dataset_type self.epoch = epoch @@ -44,22 +44,19 @@ def __init__(self, format_type, dataset_type, epoch, worker_index, self.samples_per_worker = samples_per_worker self.batch_size = batch_size self.worker_index = worker_index - self.shuffle = shuffle self.total_num_steps = self.samples_per_worker//batch_size self.reader = ReaderFactory.get_reader(type=self.format_type, dataset_type=self.dataset_type, thread_index=worker_index, epoch_number=self.epoch) assert(self.reader.is_index_based()) - self.seed = seed start_sample = self.worker_index * samples_per_worker - end_sample = (self.worker_index + 1) * samples_per_worker + end_sample = (self.worker_index + 1) * samples_per_worker - 1 + if end_sample > total_num_samples - 1: + end_sample = total_num_samples - 1 if not hasattr(self, 'indices'): - self.indices = list(range(start_sample, end_sample)) - if self.shuffle != Shuffle.OFF: - if self.shuffle == Shuffle.SEED: - np.random.seed(self.seed) - np.random.shuffle(self.indices) + self.indices = list(range(start_sample, end_sample + 1)) + self.samples_per_worker = len(self.indices) def __call__(self, sample_info): logging.debug( f"{utcnow()} Reading {sample_info.idx_in_epoch} out of {self.samples_per_worker} by worker {self.worker_index} with {self.indices} indices") @@ -74,7 +71,7 @@ def __call__(self, sample_info): class DaliIteratorDataset(object): def __init__(self, format_type, dataset_type, epoch, worker_index, - total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234): + total_num_workers, total_num_samples, samples_per_worker, batch_size): self.format_type = format_type self.dataset_type = dataset_type self.epoch = epoch @@ -83,22 +80,12 @@ def __init__(self, format_type, dataset_type, epoch, worker_index, self.samples_per_worker = samples_per_worker self.batch_size = batch_size self.worker_index = worker_index - self.shuffle = shuffle self.total_num_steps = self.samples_per_worker//batch_size self.reader = ReaderFactory.get_reader(type=self.format_type, dataset_type=self.dataset_type, thread_index=worker_index, epoch_number=self.epoch) assert(self.reader.is_iterator_based()) - self.seed = seed - start_sample = self.worker_index * samples_per_worker - end_sample = (self.worker_index + 1) * samples_per_worker - if not hasattr(self, 'indices'): - self.indices = list(range(start_sample, end_sample)) - if self.shuffle != Shuffle.OFF: - if self.shuffle == Shuffle.SEED: - np.random.seed(self.seed) - np.random.shuffle(self.indices) def __iter__(self): with Profile(MODULE_DATA_LOADER): for image in self.reader.next(): @@ -124,12 +111,12 @@ def read(self, init=False): if self._args.prefetch_size > 0: prefetch_size = self._args.prefetch_size num_pipelines = 1 - samples_per_worker = self.num_samples // num_pipelines // self._args.comm_size + samples_per_worker = int(math.ceil(self.num_samples/num_pipelines/self._args.comm_size)) for worker_index in range(num_pipelines): global_worker_index = self._args.my_rank * num_pipelines + worker_index # None executes pipeline on CPU and the reader does the batching self.dataset = DaliIndexDataset(self.format_type, self.dataset_type, self.epoch_number, global_worker_index, - self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1, self._args.sample_shuffle, self._args.seed) + self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1) pipeline = Pipeline(batch_size=self.batch_size, num_threads=num_threads, device_id=None, py_num_workers=num_threads//num_pipelines, prefetch_queue_depth=prefetch_size, py_start_method=self._args.multiprocessing_context, exec_async=True) with pipeline: diff --git a/dlio_benchmark/data_loader/torch_data_loader.py b/dlio_benchmark/data_loader/torch_data_loader.py index 7cbcb82a..80212fff 100644 --- a/dlio_benchmark/data_loader/torch_data_loader.py +++ b/dlio_benchmark/data_loader/torch_data_loader.py @@ -85,27 +85,23 @@ def __getitem__(self, image_idx): class dlio_sampler(Sampler): - def __init__(self, rank, size, num_samples, shuffle, epochs, seed): + def __init__(self, rank, size, num_samples, epochs): self.size = size self.rank = rank self.num_samples = num_samples - self.shuffle = shuffle self.epochs = epochs - self.seed = seed samples_per_proc = int(math.ceil(num_samples/size)) start_sample = self.rank * samples_per_proc - end_sample = (self.rank + 1) * samples_per_proc - self.indices = list(range(start_sample, end_sample)) + end_sample = (self.rank + 1) * samples_per_proc - 1 + if end_sample > num_samples - 1: + end_sample = num_samples - 1 + self.indices = list(range(start_sample, end_sample + 1)) def __len__(self): return self.num_samples def __iter__(self): - if self.shuffle != Shuffle.OFF: - if self.shuffle == Shuffle.SEED: - np.random.seed(self.seed) - np.random.shuffle(self.indices) for sample in self.indices: yield sample @@ -118,8 +114,7 @@ def __init__(self, format_type, dataset_type, epoch_number): def read(self): dataset = TorchDataset(self.format_type, self.dataset_type, self.epoch_number, self.num_samples, self._args.read_threads, self.batch_size) - sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.sample_shuffle, - self._args.epochs, self._args.seed) + sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.epochs) if self._args.read_threads >= 1: prefetch_factor = math.ceil(self._args.prefetch_size / self._args.read_threads) else: diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index ce717143..13265f41 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -139,6 +139,8 @@ class ConfigArguments: reader_class = None checkpoint_mechanism_class = None native_data_loader = False + train_sample_index_sum = 1 + eval_sample_index_sum = 1 def __init__(self): """ Virtually private constructor. """ @@ -261,6 +263,8 @@ def derive_configurations(self, file_list_train=None, file_list_eval=None): self.num_files_train = len(file_list_train) self.total_samples_train = self.num_samples_per_file * len(self.file_list_train) self.total_samples_eval = self.num_samples_per_file * len(self.file_list_eval) + self.train_sample_index_sum = self.total_samples_train * (self.total_samples_train - 1) // 2 + self.eval_sample_index_sum = self.total_samples_eval * (self.total_samples_eval - 1) // 2 self.required_samples = self.comm_size * self.batch_size if self.read_threads > 0: self.required_samples *= self.read_threads @@ -316,53 +320,75 @@ def derive_configurations(self, file_list_train=None, file_list_eval=None): @dlp.log def build_sample_map_iter(self, file_list, total_samples, epoch_number): logging.debug(f"ranks {self.comm_size} threads {self.read_threads} tensors") - num_threads = 1 - if self.read_threads > 0 and self.data_loader is not DataLoaderType.DALI: - num_threads = self.read_threads - samples_per_proc = int(math.ceil(total_samples/self.comm_size)) - self.samples_per_thread = samples_per_proc // num_threads - start_sample_index = samples_per_proc * self.my_rank - end_sample_index = samples_per_proc * (self.my_rank + 1) - sample_list = np.arange(start_sample_index, end_sample_index) - if self.sample_shuffle is not Shuffle.OFF: - if self.seed_change_epoch: - np.random.seed(self.seed + epoch_number) - else: - np.random.seed(self.seed) - np.random.shuffle(sample_list) - sample_index = 0 + num_files = len(file_list) + samples_sum = 0 process_thread_file_map = {} if num_files > 0: - files_per_rank = (num_files // self.comm_size) % num_files - file_index = self.my_rank * files_per_rank - for thread_index in range(num_threads): - process_thread_file_map[thread_index] = [] - for sample in sample_list: - thread_index = (sample_index // self.samples_per_thread) % num_threads - abs_path = os.path.abspath(file_list[file_index]) - process_thread_file_map[thread_index].append((sample, - abs_path, - sample_list[sample_index] % self.num_samples_per_file)) - sample_index += 1 - file_index = (sample_index // self.num_samples_per_file) % num_files - return process_thread_file_map + num_threads = 1 + if self.read_threads > 0 and self.data_loader is not DataLoaderType.DALI: + num_threads = self.read_threads + samples_per_proc = int(math.ceil(total_samples/self.comm_size)) + self.samples_per_thread = samples_per_proc // num_threads + start_sample_index = samples_per_proc * self.my_rank + end_sample_index = samples_per_proc * (self.my_rank + 1) - 1 + if end_sample_index > total_samples - 1: + end_sample_index = total_samples - 1 + sample_list = np.arange(start_sample_index, end_sample_index + 1) + logging.debug(f"{self.my_rank} {start_sample_index} {end_sample_index}") + if self.sample_shuffle is not Shuffle.OFF: + if self.seed_change_epoch: + np.random.seed(self.seed + epoch_number) + else: + np.random.seed(self.seed) + np.random.shuffle(sample_list) + sample_index = 0 + if num_files > 0: + files_per_rank = (num_files // self.comm_size) % num_files + file_index = self.my_rank * files_per_rank + for thread_index in range(num_threads): + process_thread_file_map[thread_index] = [] + for sample in sample_list: + samples_sum += sample + thread_index = (sample_index // self.samples_per_thread) % num_threads + abs_path = os.path.abspath(file_list[file_index]) + process_thread_file_map[thread_index].append((sample, + abs_path, + sample_list[sample_index] % self.num_samples_per_file)) + sample_index += 1 + file_index = (sample_index // self.num_samples_per_file) % num_files + return process_thread_file_map, samples_sum @dlp.log - def get_global_map_index(self, file_list, total_samples): + def get_global_map_index(self, file_list, total_samples, epoch_number): process_thread_file_map = {} num_files = len(file_list) + start_sample = 0 + end_sample = 0 + samples_sum = 0 if num_files > 0: + end_sample = total_samples - 1 samples_per_proc = int(math.ceil(total_samples/self.comm_size)) start_sample = self.my_rank * samples_per_proc - end_sample = (self.my_rank + 1) * samples_per_proc - for global_sample_index in range(start_sample, end_sample): - file_index = global_sample_index//self.num_samples_per_file - abs_path = os.path.abspath(file_list[file_index]) + end_sample = (self.my_rank + 1) * samples_per_proc - 1 + if end_sample > total_samples - 1: + end_sample = total_samples - 1 + logging.debug(f"{self.my_rank} {start_sample} {end_sample}") + sample_list = np.arange(start_sample, end_sample + 1) + if self.sample_shuffle is not Shuffle.OFF: + if self.seed_change_epoch: + np.random.seed(self.seed + epoch_number) + else: + np.random.seed(self.seed) + np.random.shuffle(sample_list) + for sample_index in range(end_sample - start_sample + 1): + global_sample_index = sample_list[sample_index] + samples_sum += global_sample_index + file_index = int(math.floor(global_sample_index/self.num_samples_per_file)) + abs_path = os.path.abspath(file_list[file_index]) sample_index = global_sample_index % self.num_samples_per_file process_thread_file_map[global_sample_index] = (abs_path, sample_index) - logging.debug(f"{self.my_rank} {process_thread_file_map}") - return process_thread_file_map + return process_thread_file_map, samples_sum @dlp.log def reconfigure(self, epoch_number): @@ -375,13 +401,23 @@ def reconfigure(self, epoch_number): np.random.shuffle(self.file_list_train) np.random.shuffle(self.file_list_eval) if self.data_loader_sampler == DataLoaderSampler.ITERATIVE: - self.train_file_map = self.build_sample_map_iter(self.file_list_train, self.total_samples_train, + self.train_file_map, local_train_sample_sum = self.build_sample_map_iter(self.file_list_train, self.total_samples_train, epoch_number) - self.val_file_map = self.build_sample_map_iter(self.file_list_eval, self.total_samples_eval, epoch_number) + self.val_file_map, local_eval_sample_sum = self.build_sample_map_iter(self.file_list_eval, self.total_samples_eval, epoch_number) elif self.data_loader_sampler == DataLoaderSampler.INDEX: - self.train_global_index_map = self.get_global_map_index(self.file_list_train, self.total_samples_train) - self.val_global_index_map = self.get_global_map_index(self.file_list_eval, self.total_samples_eval) - + self.train_global_index_map, local_train_sample_sum = self.get_global_map_index(self.file_list_train, self.total_samples_train, + epoch_number) + self.val_global_index_map, local_eval_sample_sum = self.get_global_map_index(self.file_list_eval, self.total_samples_eval, + epoch_number) + global_train_sample_sum = DLIOMPI.get_instance().reduce(local_train_sample_sum) + global_eval_sample_sum = DLIOMPI.get_instance().reduce(local_eval_sample_sum) + if self.my_rank == 0: + logging.info(f"total sample: train {global_train_sample_sum} eval {global_eval_sample_sum}") + if self.train_sample_index_sum != global_train_sample_sum: + raise Exception(f"Sharding of train samples are missing samples got {global_train_sample_sum} but expected {self.train_sample_index_sum}") + + if self.eval_sample_index_sum != global_eval_sample_sum: + raise Exception(f"Sharding of eval samples are missing samples got {global_eval_sample_sum} but expected {self.eval_sample_index_sum}") def LoadConfig(args, config): ''' diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 7923fded..2e342a32 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -193,6 +193,14 @@ def nnodes(self): raise Exception(f"method {self.classname()}.size() called before initializing MPI") else: return self.mpi_size//self.mpi_ppn + + def reduce(self, num): + from mpi4py import MPI + if self.mpi_state == MPIState.UNINITIALIZED: + raise Exception(f"method {self.classname()}.reduce() called before initializing MPI") + else: + return MPI.COMM_WORLD.allreduce(num, op=MPI.SUM) + def finalize(self): from mpi4py import MPI if self.mpi_state == MPIState.MPI_INITIALIZED and MPI.Is_initialized(): diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..5660001f --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +norecursedirs = venv* docs *.egg-info .git dlio_benchmark data checkpoints build hydra_log \ No newline at end of file diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 6c3b476d..99e2faa4 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -390,26 +390,46 @@ def test_pytorch_multiprocessing_context(nt, context) -> None: finalize() @pytest.mark.timeout(60, method="thread") -@pytest.mark.parametrize("fmt, framework, dataloader", [("png", "tensorflow","tensorflow"), ("npz", "tensorflow","tensorflow"), - ("jpeg", "tensorflow","tensorflow"), ("tfrecord", "tensorflow","tensorflow"), - ("hdf5", "tensorflow","tensorflow"), ("csv", "tensorflow","tensorflow"), - ("indexed_binary", "tensorflow","tensorflow"), ("mmap_indexed_binary", "tensorflow","tensorflow"), - ("png", "pytorch", "pytorch"), ("npz", "pytorch", "pytorch"), - ("jpeg", "pytorch", "pytorch"), ("hdf5", "pytorch", "pytorch"), - ("csv", "pytorch", "pytorch"), ("indexed_binary", "pytorch", "pytorch"), - ("mmap_indexed_binary", "pytorch", "pytorch"), - ("png", "tensorflow", "dali"), ("npz", "tensorflow", "dali"), - ("jpeg", "tensorflow", "dali"), ("hdf5", "tensorflow", "dali"), - ("csv", "tensorflow", "dali"), ("indexed_binary", "tensorflow", "dali"), - ("mmap_indexed_binary", "tensorflow", "dali"), - ("png", "pytorch", "dali"), ("npz", "pytorch", "dali"), - ("jpeg", "pytorch", "dali"), ("hdf5", "pytorch", "dali"), - ("csv", "pytorch", "dali"), ("indexed_binary", "pytorch", "dali"), - ("mmap_indexed_binary", "pytorch", "dali"), +@pytest.mark.parametrize("fmt, framework, dataloader, is_even", [("png", "tensorflow","tensorflow", True), ("npz", "tensorflow","tensorflow", True), + ("jpeg", "tensorflow","tensorflow", True), ("tfrecord", "tensorflow","tensorflow", True), + ("hdf5", "tensorflow","tensorflow", True), ("csv", "tensorflow","tensorflow", True), + ("indexed_binary", "tensorflow","tensorflow", True), ("mmap_indexed_binary", "tensorflow","tensorflow", True), + ("png", "pytorch", "pytorch", True), ("npz", "pytorch", "pytorch", True), + ("jpeg", "pytorch", "pytorch", True), ("hdf5", "pytorch", "pytorch", True), + ("csv", "pytorch", "pytorch", True), ("indexed_binary", "pytorch", "pytorch", True), + ("mmap_indexed_binary", "pytorch", "pytorch", True), + ("png", "tensorflow", "dali", True), ("npz", "tensorflow", "dali", True), + ("jpeg", "tensorflow", "dali", True), ("hdf5", "tensorflow", "dali", True), + ("csv", "tensorflow", "dali", True), ("indexed_binary", "tensorflow", "dali", True), + ("mmap_indexed_binary", "tensorflow", "dali", True), + ("png", "pytorch", "dali", True), ("npz", "pytorch", "dali", True), + ("jpeg", "pytorch", "dali", True), ("hdf5", "pytorch", "dali", True), + ("csv", "pytorch", "dali", True), ("indexed_binary", "pytorch", "dali", True), + ("mmap_indexed_binary", "pytorch", "dali", True), + ("png", "tensorflow","tensorflow", False), ("npz", "tensorflow","tensorflow", False), + ("jpeg", "tensorflow","tensorflow", False), ("tfrecord", "tensorflow","tensorflow", False), + ("hdf5", "tensorflow","tensorflow", False), ("csv", "tensorflow","tensorflow", False), + ("indexed_binary", "tensorflow","tensorflow", False), ("mmap_indexed_binary", "tensorflow","tensorflow", False), + ("png", "pytorch", "pytorch", False), ("npz", "pytorch", "pytorch", False), + ("jpeg", "pytorch", "pytorch", False), ("hdf5", "pytorch", "pytorch", False), + ("csv", "pytorch", "pytorch", False), ("indexed_binary", "pytorch", "pytorch", False), + ("mmap_indexed_binary", "pytorch", "pytorch", False), + ("png", "tensorflow", "dali", False), ("npz", "tensorflow", "dali", False), + ("jpeg", "tensorflow", "dali", False), ("hdf5", "tensorflow", "dali", False), + ("csv", "tensorflow", "dali", False), ("indexed_binary", "tensorflow", "dali", False), + ("mmap_indexed_binary", "tensorflow", "dali", False), + ("png", "pytorch", "dali", False), ("npz", "pytorch", "dali", False), + ("jpeg", "pytorch", "dali", False), ("hdf5", "pytorch", "dali", False), + ("csv", "pytorch", "dali", False), ("indexed_binary", "pytorch", "dali", False), + ("mmap_indexed_binary", "pytorch", "dali", False), ]) -def test_train(fmt, framework, dataloader) -> None: +def test_train(fmt, framework, dataloader, is_even) -> None: init() clean() + if is_even: + num_files = 16 + else: + num_files = 17 if comm.rank == 0: logging.info("") logging.info("=" * 80) @@ -424,7 +444,7 @@ def test_train(fmt, framework, dataloader) -> None: 'workload.train.computation_time=0.01', \ 'workload.evaluation.eval_time=0.005', \ '++workload.train.epochs=1', \ - '++workload.dataset.num_files_train=16', \ + f'++workload.dataset.num_files_train={num_files}', \ '++workload.reader.read_threads=1']) benchmark = run_benchmark(cfg) #clean()