From b2bf926df8ad42e4f1556b52ff7ca108c1f0cd25 Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Sat, 5 Oct 2024 00:54:36 -0700 Subject: [PATCH 01/12] New improved modelling for LLM Deepspeed. --- .../checkpointing/base_checkpointing.py | 40 ++++++++++++++----- .../configs/workload/megatron_deepspeed.yaml | 9 +++-- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/dlio_benchmark/checkpointing/base_checkpointing.py b/dlio_benchmark/checkpointing/base_checkpointing.py index a9e184cf..72ccfb8f 100644 --- a/dlio_benchmark/checkpointing/base_checkpointing.py +++ b/dlio_benchmark/checkpointing/base_checkpointing.py @@ -15,6 +15,7 @@ limitations under the License. """ import os +import math from abc import ABC, abstractmethod from dlio_benchmark.common.enumerations import CheckpointLocationType @@ -53,7 +54,7 @@ def __init__(self, ext): self.layer_state = dict() for index, state in enumerate(self.args.layer_parameters): if state > 0: - self.layer_state[str(index)] = self.get_tensor(state) + self.layer_state[str(index)] = self.get_tensor(state / self.args.tensor_parallelism) @abstractmethod def get_tensor(self, size): @@ -66,24 +67,41 @@ def save_state(self, suffix, state): def get_name(self, suffix): return os.path.join(self.args.checkpoint_folder, f"{suffix}.{self.ext}") + def get_layer_index(self, rank, tensor_parallelism, pipeline_parallelism, total_layers): + if tensor_parallelism > 1: + total_layers = total_layers + tensor_parallelism + + divisible_layers = total_layers - (total_layers % pipeline_parallelism) + min_layers_per_pipeline = divisible_layers // pipeline_parallelism + max_layer_per_pipeline = min_layers_per_pipeline + 1 + pipeline_rank = (rank // tensor_parallelism) % pipeline_parallelism + left_layers = total_layers - divisible_layers + num_layers_per_pipeline = max_layer_per_pipeline + if pipeline_rank >= left_layers: + num_layers_per_pipeline = min_layers_per_pipeline + if pipeline_rank < left_layers: + start_layer = pipeline_rank * max_layer_per_pipeline + end_layer = start_layer + num_layers_per_pipeline - 1 + else: + start_layer = left_layers * max_layer_per_pipeline + (pipeline_rank - left_layers) * (min_layers_per_pipeline) + end_layer = start_layer + num_layers_per_pipeline - 1 + return start_layer, end_layer + @abstractmethod def checkpoint(self, epoch, step_number): - rank_to_checkpoint = DLIOMPI.get_instance().rank() + my_rank = DLIOMPI.get_instance().rank() + rank_to_checkpoint = my_rank if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: rank_to_checkpoint = 0 - if rank_to_checkpoint == DLIOMPI.get_instance().rank(): - my_rank = DLIOMPI.get_instance().rank() + if rank_to_checkpoint == my_rank: if self.model_state: self.save_state(suffix=f"model-{epoch}-{step_number}-{my_rank}", state=self.model_state) if self.optimization_state: self.save_state(suffix=f"optimizer-{epoch}-{step_number}-{my_rank}", state=self.optimization_state) - if rank_to_checkpoint % self.args.pipeline_parallelism == 0: - if self.layer_state and self.args.num_layers > 0: - total_layers = self.args.num_layers - if self.args.tensor_parallelism > 1: - total_layers = total_layers + self.args.tensor_parallelism - for layer in range(total_layers): - self.save_state(suffix=f"layer-{layer}-{epoch}-{step_number}-{my_rank}", state=self.layer_state) + + start_layer, end_layer = self.get_layer_index(my_rank,self.args.tensor_parallelism, self.args.pipeline_parallelism, self.args.num_layers) + for layer_index in range(start_layer, end_layer + 1): + self.save_state(suffix=f"layer-{layer_index}-{epoch}-{step_number}-{my_rank}", state=self.layer_state) @abstractmethod def finalize(self): diff --git a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml index 20e4a3aa..9353c4b7 100644 --- a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml +++ b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml @@ -23,7 +23,8 @@ reader: sample_shuffle: seed train: - epochs: 311541 + total_training_steps: 311541 + epochs: 1 computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec. checkpoint: @@ -32,5 +33,7 @@ checkpoint: model_size: 30102 type: all_ranks optimization_groups: [1009254400, 865075200, 793600] - num_layers: 44 - layer_parameters: [129761280, 20971520] + num_layers: 40 + pipeline_parallelism: 8 + tensor_parallelism: 4 + layer_parameters: [52583936, 209715200] From 9f12a5063256524fbe91d4bf96727ba858e14dbb Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Sat, 5 Oct 2024 01:02:01 -0700 Subject: [PATCH 02/12] make size of tensor even integer. --- dlio_benchmark/checkpointing/base_checkpointing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlio_benchmark/checkpointing/base_checkpointing.py b/dlio_benchmark/checkpointing/base_checkpointing.py index 72ccfb8f..d7a4bc0c 100644 --- a/dlio_benchmark/checkpointing/base_checkpointing.py +++ b/dlio_benchmark/checkpointing/base_checkpointing.py @@ -54,7 +54,7 @@ def __init__(self, ext): self.layer_state = dict() for index, state in enumerate(self.args.layer_parameters): if state > 0: - self.layer_state[str(index)] = self.get_tensor(state / self.args.tensor_parallelism) + self.layer_state[str(index)] = self.get_tensor(state // self.args.tensor_parallelism) @abstractmethod def get_tensor(self, size): From c41272518f314d29bc53b84ffd3fb1465ae69a4e Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Sat, 5 Oct 2024 01:55:46 -0700 Subject: [PATCH 03/12] fixed formula --- tests/dlio_benchmark_test.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 99e2faa4..834d7ab8 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -228,8 +228,8 @@ def test_iostat_profiling() -> None: ("pytorch", 1024, [1024, 128], 2, [16], "all_ranks"), ("tensorflow", 1024, [1024, 128], 2, [16], "rank_zero"), ("pytorch", 1024, [1024, 128], 2, [16], "rank_zero"), - ("tensorflow", 1024, [128], 1, [], "all_ranks"), - ("pytorch", 1024, [128], 1, [], "all_ranks")]) + ("tensorflow", 1024, [128], 1, [16], "all_ranks"), + ("pytorch", 1024, [128], 1, [16], "all_ranks")]) def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_params, type) -> None: init() clean() @@ -239,6 +239,8 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p logging.info(f" DLIO test for checkpointing at the end of epochs") logging.info("=" * 80) with initialize_config_dir(version_base=None, config_dir=config_dir): + epochs = 8 + epoch_per_ckp = 2 cfg = compose(config_name='config', overrides=[f'++workload.framework={framework}', f'++workload.reader.data_loader={framework}', @@ -246,8 +248,8 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p '++workload.workflow.generate_data=True', '++workload.train.computation_time=0.01', '++workload.evaluation.eval_time=0.005', - '++workload.train.epochs=8', '++workload.workflow.checkpoint=True', - '++workload.checkpoint.epochs_between_checkpoints=2', + f'++workload.train.epochs={epochs}', '++workload.workflow.checkpoint=True', + f'++workload.checkpoint.epochs_between_checkpoints={epoch_per_ckp}', f'++workload.checkpoint.type={type}', f'++workload.checkpoint.model_size={model_size}', f'++workload.checkpoint.optimization_groups={optimizers}', @@ -267,11 +269,17 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p nranks = 1 if type == "all_ranks": nranks = comm.size + num_model_files = 1 + num_optimizer_files = 1 + num_layer_files = num_layers + files_per_checkpoint = (num_model_files + num_optimizer_files + num_layer_files) * nranks + comm.Barrier() if framework == "tensorflow": - num_check_files = 8 / 2 * (2 + 2 + 2*n) * nranks + 1 + file_per_ckp = 2 + num_check_files = epochs / epoch_per_ckp * files_per_checkpoint * file_per_ckp + 1 assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin} " if framework == "pytorch": - num_check_files = 8 / 2 * (1 + 1 + n) * nranks + num_check_files = epochs / epoch_per_ckp * files_per_checkpoint assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin}" comm.Barrier() if comm.rank == 0: From b557f087ac9a90019a8cf7b4f016285eb1bf936a Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Sat, 5 Oct 2024 01:56:25 -0700 Subject: [PATCH 04/12] fixed calculation --- tests/dlio_benchmark_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 834d7ab8..f9a63d0f 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -273,7 +273,6 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p num_optimizer_files = 1 num_layer_files = num_layers files_per_checkpoint = (num_model_files + num_optimizer_files + num_layer_files) * nranks - comm.Barrier() if framework == "tensorflow": file_per_ckp = 2 num_check_files = epochs / epoch_per_ckp * files_per_checkpoint * file_per_ckp + 1 From acc2b4f8da414d31b46434f5cd465910a636812f Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Sat, 5 Oct 2024 02:02:51 -0700 Subject: [PATCH 05/12] switch order of faster tests --- .github/workflows/ci.yml | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c90663d..ff052dc9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,6 +78,20 @@ jobs: source ${VENV_PATH}/bin/activate pip install --upgrade pip pip install -r requirements.txt + - name: test_checkpoint_epoch + run: | + source ${VENV_PATH}/bin/activate + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v + rm -rf data + - name: test_checkpoint_step + run: | + source ${VENV_PATH}/bin/activate + mpirun -np 2 pytest -k test_checkpoint_step -v - name: test_gen_data run: | source ${VENV_PATH}/bin/activate @@ -182,20 +196,6 @@ jobs: mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-tensorflow] -v mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-pytorch] -v rm -rf data - - name: test_checkpoint_epoch - run: | - source ${VENV_PATH}/bin/activate - mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v - mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v - mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v - mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v - mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v - mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v - rm -rf data - - name: test_checkpoint_step - run: | - source ${VENV_PATH}/bin/activate - mpirun -np 2 pytest -k test_checkpoint_step -v - name: test_eval run: | source ${VENV_PATH}/bin/activate From b4000d3b5696f6e425a0ad6448fa6977afea80dc Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 10:20:04 -0700 Subject: [PATCH 06/12] fix epochs --- dlio_benchmark/configs/workload/megatron_deepspeed.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml index 9353c4b7..52e6f299 100644 --- a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml +++ b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml @@ -23,8 +23,7 @@ reader: sample_shuffle: seed train: - total_training_steps: 311541 - epochs: 1 + epochs: 311541 computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec. checkpoint: From aabe14999fcb29714d1556dd05fc5c81890cab14 Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 10:51:37 -0700 Subject: [PATCH 07/12] more accurate representation of deepspeed. --- dlio_benchmark/configs/workload/megatron_deepspeed.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml index 52e6f299..84b5ea9c 100644 --- a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml +++ b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml @@ -24,11 +24,12 @@ reader: train: epochs: 311541 + total_training_steps: 64 computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec. checkpoint: checkpoint_folder: checkpoints/megatron-deepspeed - steps_between_checkpoints: 1000 + epochs_between_checkpoints: 1000 model_size: 30102 type: all_ranks optimization_groups: [1009254400, 865075200, 793600] From 7104043ed790a994b62cef102b6ca1b6a99c6906 Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 12:09:43 -0700 Subject: [PATCH 08/12] use mpiio to generate offsets --- .../indexed_binary_generator.py | 44 +++++++++++-------- .../reader/indexed_binary_mmap_reader.py | 4 +- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/dlio_benchmark/data_generator/indexed_binary_generator.py b/dlio_benchmark/data_generator/indexed_binary_generator.py index 6a7013b9..90c4ee6a 100644 --- a/dlio_benchmark/data_generator/indexed_binary_generator.py +++ b/dlio_benchmark/data_generator/indexed_binary_generator.py @@ -54,7 +54,7 @@ def generate(self): samples_processed = 0 total_samples = self.total_files_to_generate * self.num_samples dim = self.get_dimension(self.total_files_to_generate) - logging.info(dim) + # logging.info(dim) if self.total_files_to_generate <= self.comm_size: # Use collective I/O # we need even number os samples for collective I/O @@ -68,6 +68,30 @@ def generate(self): out_path_spec = self.storage.get_uri(self._file_list[file_index]) out_path_spec_off_idx = self.index_file_path_off(out_path_spec) out_path_spec_sz_idx = self.index_file_path_size(out_path_spec) + + if self.my_rank == 0: + logging.info(f"{utcnow()} Starting metadata generation. ") + fh_off = MPI.File.Open(comm, out_path_spec_off_idx, amode) + fh_sz = MPI.File.Open(comm, out_path_spec_sz_idx, amode) + elements_per_loop = min(int(MB / np.dtype(np.uint32).itemsize), samples_per_rank) + offsets_processed=0 + for element_index in range(self.my_rank*samples_per_rank, samples_per_rank*(self.my_rank+1), elements_per_loop): + myfmt = 'Q' * elements_per_loop + offsets = np.array(range(self.my_rank * elements_per_loop * sample_size, + (self.my_rank + 1) * elements_per_loop * sample_size, + sample_size), dtype=np.uint32) + + sizes = np.array([sample_size] * elements_per_loop, dtype=np.uint32) + offset = element_index * np.dtype(np.uint32).itemsize + fh_off.Write_at_all(offset, offsets) + fh_sz.Write_at_all(offset, sizes) + offsets_processed += elements_per_loop + progress(offsets_processed * self.comm_size, total_samples, "Generating Indexed Binary Data Index for Samples") + fh_off.Close() + fh_sz.Close() + if self.my_rank == 0: + logging.info(f"{utcnow()} Starting Sample generation. ") + fh = MPI.File.Open(comm, out_path_spec, amode) samples_per_loop = int(MB / sample_size) @@ -79,24 +103,6 @@ def generate(self): samples_processed += samples_per_loop progress(samples_processed * self.comm_size, total_samples, "Generating Indexed Binary Data Samples") fh.Close() - logging.info(f"{utcnow()} rank {self.my_rank} writing metadata") - off_file = open(out_path_spec_off_idx, "wb") - sz_file = open(out_path_spec_sz_idx, "wb") - if int(file_index / self.comm_size) == self.my_rank: - # Write offsets - myfmt = 'Q' * self.num_samples - data_to_write = self.num_samples * sample_size - samples_to_write = self.num_samples - offsets = range(0, data_to_write, sample_size) - offsets = offsets[:samples_to_write] - binary_offsets = struct.pack(myfmt, *offsets) - off_file.write(binary_offsets) - - # Write sizes - myfmt = 'Q' * samples_to_write - sample_sizes = [sample_size] * samples_to_write - binary_sizes = struct.pack(myfmt, *sample_sizes) - sz_file.write(binary_sizes) else: for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)): dim1 = dim[2*i] diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py index 500f9d2c..71be887c 100644 --- a/dlio_benchmark/reader/indexed_binary_mmap_reader.py +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -57,10 +57,10 @@ def load_index_file(self, global_sample_idx, filename, sample_index): self.file_map_ibr[filename] = [] bin_buffer_mmap = np.memmap(offset_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) - self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint8)) + self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32)) bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) - self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint8)) + self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32)) @dlp.log def load_index(self): From 6b506056b27c89f75cb97b0794481546c5e98a0a Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 15:12:17 -0700 Subject: [PATCH 09/12] fxied deepspeed --- .../configs/workload/megatron_deepspeed.yaml | 9 +++-- .../reader/indexed_binary_mmap_reader.py | 34 +++++++++++++------ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml index 84b5ea9c..d77614f0 100644 --- a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml +++ b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml @@ -17,19 +17,18 @@ dataset: reader: data_loader: pytorch - batch_size: 1024 + batch_size: 16 read_threads: 1 file_shuffle: seed sample_shuffle: seed train: - epochs: 311541 - total_training_steps: 64 - computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec. + epochs: 3 + computation_time: 2.44 # 2.44 sec per step checkpoint: checkpoint_folder: checkpoints/megatron-deepspeed - epochs_between_checkpoints: 1000 + steps_between_checkpoints: 1000 model_size: 30102 type: all_ranks optimization_groups: [1009254400, 865075200, 793600] diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py index 71be887c..f472f134 100644 --- a/dlio_benchmark/reader/indexed_binary_mmap_reader.py +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -36,8 +36,8 @@ class IndexedBinaryMMapReader(FormatReader): def __init__(self, dataset_type, thread_index, epoch): super().__init__(dataset_type, thread_index) self.file_map_ibr = {} - self.load_index() self.buffer_map = {} + self.load_index() def index_file_path_off(self, prefix_path): return prefix_path + '.off.idx' @@ -61,6 +61,9 @@ def load_index_file(self, global_sample_idx, filename, sample_index): bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32)) + bin_buffer_mmap = np.memmap(filename, mode='r', order='C') + bin_buffer = memoryview(bin_buffer_mmap) + self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8) @dlp.log def load_index(self): @@ -76,16 +79,13 @@ def load_index(self): @dlp.log def open(self, filename): - super().open(filename) - bin_buffer_mmap = np.memmap(filename, mode='r', order='C') - bin_buffer = memoryview(bin_buffer_mmap) - self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8) - return bin_buffer_mmap + super().open(filename) + return self.buffer_map[filename] @dlp.log def close(self, filename): super().close(filename) - self.open_file_map[filename]._mmap.close() + @dlp.log def get_sample(self, filename, sample_index): @@ -93,7 +93,6 @@ def get_sample(self, filename, sample_index): buffer = self.buffer_map[filename] offset = self.file_map_ibr[filename][0][sample_index] size = self.file_map_ibr[filename][1][sample_index] - logging.debug(f"reading sample from offset {offset} of size {size} from file {filename}") image = buffer[offset:offset+size] dlp.update(image_size=size) @@ -103,14 +102,27 @@ def next(self): @dlp.log def read_index(self, image_idx, step): - return super().read_index(image_idx, step) + filename, sample_index = self.global_index_map[image_idx] + self.get_sample(filename, sample_index) + return self._args.resized_image @dlp.log def finalize(self): - return super().finalize() + super().finalize() + if self._args.data_loader_sampler == DataLoaderSampler.ITERATIVE: + for global_sample_idx, filename, sample_index in self.file_map[self.thread_index]: + self.buffer_map[filename]._mmap.close() + self.file_map_ibr[filename][0]._mmap.close() + self.file_map_ibr[filename][1]._mmap.close() + elif self._args.data_loader_sampler == DataLoaderSampler.INDEX: + for global_sample_idx, (filename, sample_index) in self.global_index_map.items(): + self.buffer_map[filename]._mmap.close() + self.file_map_ibr[filename][0]._mmap.close() + self.file_map_ibr[filename][1]._mmap.close() + def is_index_based(self): return True def is_iterator_based(self): - return True \ No newline at end of file + return True From b05eb73e173fb6db5bf362666d9e2016f35cb0f8 Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 15:49:56 -0700 Subject: [PATCH 10/12] fixed type for overflow warning --- .../data_generator/indexed_binary_generator.py | 10 +++++----- dlio_benchmark/reader/indexed_binary_mmap_reader.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dlio_benchmark/data_generator/indexed_binary_generator.py b/dlio_benchmark/data_generator/indexed_binary_generator.py index 90c4ee6a..9d14a8c7 100644 --- a/dlio_benchmark/data_generator/indexed_binary_generator.py +++ b/dlio_benchmark/data_generator/indexed_binary_generator.py @@ -73,16 +73,16 @@ def generate(self): logging.info(f"{utcnow()} Starting metadata generation. ") fh_off = MPI.File.Open(comm, out_path_spec_off_idx, amode) fh_sz = MPI.File.Open(comm, out_path_spec_sz_idx, amode) - elements_per_loop = min(int(MB / np.dtype(np.uint32).itemsize), samples_per_rank) + off_type = np.uint64 + elements_per_loop = min(int(MB / np.dtype(off_type).itemsize), samples_per_rank) offsets_processed=0 for element_index in range(self.my_rank*samples_per_rank, samples_per_rank*(self.my_rank+1), elements_per_loop): - myfmt = 'Q' * elements_per_loop offsets = np.array(range(self.my_rank * elements_per_loop * sample_size, (self.my_rank + 1) * elements_per_loop * sample_size, - sample_size), dtype=np.uint32) + sample_size), dtype=off_type) - sizes = np.array([sample_size] * elements_per_loop, dtype=np.uint32) - offset = element_index * np.dtype(np.uint32).itemsize + sizes = np.array([sample_size] * elements_per_loop, dtype=off_type) + offset = element_index * np.dtype(off_type).itemsize fh_off.Write_at_all(offset, offsets) fh_sz.Write_at_all(offset, sizes) offsets_processed += elements_per_loop diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py index f472f134..92141c75 100644 --- a/dlio_benchmark/reader/indexed_binary_mmap_reader.py +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -57,10 +57,10 @@ def load_index_file(self, global_sample_idx, filename, sample_index): self.file_map_ibr[filename] = [] bin_buffer_mmap = np.memmap(offset_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) - self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32)) + self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint64)) bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) - self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32)) + self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint64)) bin_buffer_mmap = np.memmap(filename, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8) From 85f849088c0e70c0198f47d6d0520b6acba34963 Mon Sep 17 00:00:00 2001 From: hariharandev1 Date: Thu, 10 Oct 2024 21:51:09 -0700 Subject: [PATCH 11/12] added preprocessing step --- dlio_benchmark/reader/indexed_binary_mmap_reader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py index 92141c75..3e0313ef 100644 --- a/dlio_benchmark/reader/indexed_binary_mmap_reader.py +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -104,6 +104,7 @@ def next(self): def read_index(self, image_idx, step): filename, sample_index = self.global_index_map[image_idx] self.get_sample(filename, sample_index) + self.preprocess() return self._args.resized_image @dlp.log From ecc384a8561f5760129f26e7f872798a73e4a24f Mon Sep 17 00:00:00 2001 From: Hariharan Devarajan Date: Wed, 30 Oct 2024 11:26:32 -0500 Subject: [PATCH 12/12] Update indexed_binary_mmap_reader.py --- dlio_benchmark/reader/indexed_binary_mmap_reader.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py index 7a06323d..3e0313ef 100644 --- a/dlio_benchmark/reader/indexed_binary_mmap_reader.py +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -61,6 +61,9 @@ def load_index_file(self, global_sample_idx, filename, sample_index): bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C') bin_buffer = memoryview(bin_buffer_mmap) self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint64)) + bin_buffer_mmap = np.memmap(filename, mode='r', order='C') + bin_buffer = memoryview(bin_buffer_mmap) + self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8) @dlp.log def load_index(self):