Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan authored Oct 30, 2024
2 parents 85f8490 + cda4df3 commit 2ffa7cb
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 19 deletions.
2 changes: 1 addition & 1 deletion dlio_benchmark/data_generator/hdf5_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def generate(self):
dim2 = dim[2*i+1]
records = np.random.randint(255, size=(dim1, dim2, self.num_samples), dtype=np.uint8)
out_path_spec = self.storage.get_uri(self._file_list[i])
progress(i+1, self.total_files_to_generate, "Generating NPZ Data")
progress(i+1, self.total_files_to_generate, "Generating HDF5 Data")
hf = h5py.File(out_path_spec, 'w')
hf.create_dataset('records', (self.num_samples, dim1, dim2), chunks=chunks, compression=compression,
compression_opts=compression_level, dtype=np.uint8, data=records)
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/data_generator/indexed_binary_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def generate(self):
logging.info(f"{utcnow()} Starting Sample generation. ")

fh = MPI.File.Open(comm, out_path_spec, amode)
samples_per_loop = int(MB / sample_size)
samples_per_loop = int(MB * 16 / sample_size)

for sample_index in range(self.my_rank*samples_per_rank, samples_per_rank*(self.my_rank+1), samples_per_loop):
#logging.info(f"{utcnow()} rank {self.my_rank} writing {sample_index} * {samples_per_loop} for {samples_per_rank} samples")
Expand Down
5 changes: 4 additions & 1 deletion dlio_benchmark/data_loader/dali_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def next(self):
self.read(True)
while step < self.num_samples // self.batch_size:
for pipe in self.pipelines:
outputs = pipe.share_outputs()
try:
outputs = pipe.share_outputs()
except StopIteration:
return
logging.debug(f"{utcnow()} Output batch {step} {len(outputs)}")
yield outputs
step += 1
Expand Down
9 changes: 6 additions & 3 deletions dlio_benchmark/data_loader/native_dali_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ def next(self):
for pipeline in self.pipelines:
pipeline.reset()
for step in range(num_samples // batch_size):
for batch in self._dataset:
logging.debug(f"{utcnow()} Creating {len(batch)} batches by {self._args.my_rank} rank ")
yield batch
try:
for batch in self._dataset:
logging.debug(f"{utcnow()} Creating {len(batch)} batches by {self._args.my_rank} rank ")
yield batch
except StopIteration:
return
self.epoch_number += 1
dlp.update(epoch=self.epoch_number)
@dlp.log
Expand Down
17 changes: 8 additions & 9 deletions dlio_benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _eval(self, epoch):
eval_time = 0.0
if self.eval_time > 0:
if self.eval_time_stdev > 0:
eval_time = random.normal(self.eval_time, self.eval_time_stdev)
eval_time = abs(random.normal(self.eval_time, self.eval_time_stdev))
else:
eval_time = self.eval_time
self.framework.compute(batch, epoch, step, eval_time)
Expand Down Expand Up @@ -257,6 +257,12 @@ def _train(self, epoch):
loader = self.framework.get_loader(dataset_type=DatasetType.TRAIN)
t0 = time()
for batch in dlp.iter(loader.next()):
if overall_step > max_steps or ((self.total_training_steps > 0) and (overall_step > self.total_training_steps)):
if self.args.my_rank == 0:
logging.info(f"{utcnow()} Maximum number of steps reached")
if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint):
self.stats.end_block(epoch, block, block_step - 1)
break
self.stats.batch_loaded(epoch, overall_step, block, t0)
# Log a new block, unless it's the first one which we've already logged before the loop
if block_step == 1 and block != 1:
Expand All @@ -265,7 +271,7 @@ def _train(self, epoch):
if self.computation_time > 0:
self.framework.trace_object("Train", overall_step, 1)
if self.computation_time_stdev > 0:
computation_time = random.normal(self.computation_time, self.computation_time_stdev)
computation_time = abs(random.normal(self.computation_time, self.computation_time_stdev))
else:
computation_time = self.computation_time
self.framework.compute(batch, epoch, block_step, computation_time)
Expand All @@ -283,13 +289,6 @@ def _train(self, epoch):
self.next_checkpoint_step += self.steps_between_checkpoints
else:
block_step += 1

if overall_step >= max_steps or overall_step == self.total_training_steps:
if self.args.my_rank == 0:
logging.info(f"{utcnow()} Maximum number of steps reached")
if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint):
self.stats.end_block(epoch, block, block_step - 1)
break
overall_step += 1
t0 = time()
self.comm.barrier()
Expand Down
3 changes: 0 additions & 3 deletions dlio_benchmark/reader/indexed_binary_mmap_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ def load_index_file(self, global_sample_idx, filename, sample_index):
bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint64))
bin_buffer_mmap = np.memmap(filename, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8)

@dlp.log
def load_index(self):
Expand Down
2 changes: 1 addition & 1 deletion docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ dataset
* - compression_level
- 4
- level of compression for gzip
* - chunking
* - enable_chunking
- False
- whether to use chunking to store hdf5.
* - chunk_size
Expand Down

0 comments on commit 2ffa7cb

Please sign in to comment.