Skip to content

Commit

Permalink
fixed typo and a bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghh04 committed May 31, 2024
1 parent eeeae3b commit 75bdee2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
20 changes: 15 additions & 5 deletions dlio_benchmark/reader/tf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _parse_image(self, serialized):
'image': tf.io.FixedLenFeature([], tf.string),
'size': tf.io.FixedLenFeature([], tf.int64)
}
parsed_example = tf.io.parse_example(serialized=serialized, features=features)
#parsed_example = tf.io.parse_example(serialized=serialized, features=features)
# Get the image as raw bytes.
#image_raw = parsed_example['image']
#dimension = tf.cast(parsed_example['size'], tf.int32).numpy()
Expand All @@ -94,10 +94,20 @@ def next(self):
self._dataset = self._dataset.shuffle(buffer_size=self._args.shuffle_size)

self._dataset = self._dataset.shard(num_shards=self._args.comm_size, index=self._args.my_rank)
self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True)
self._dataset = self._dataset.map(
lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]),
num_parallel_calls=self._args.computation_threads)
if self._args.computation_threads==0:
self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True)
else:
if self._args.computation_threads <= self.batch_size:
self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True)
self._dataset = self._dataset.map(
lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]),
num_parallel_calls=self._args.computation_threads)
else:
self._dataset = self._dataset.batch(self._args.computation_threads)
self._dataset = self._dataset.map(
lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]),
num_parallel_calls=self._args.computation_threads)
self._dataset = self._dataset.unbatch(self.batch_size)
self._dataset = self._dataset.repeat(self._args.epochs)
total = math.ceil(len(self._file_list)/self._args.comm_size / self.batch_size * self._args.num_samples_per_file)
return self._dataset.take(total*self._args.epochs).prefetch(buffer_size=self._args.prefetch_size)
Expand Down
4 changes: 2 additions & 2 deletions dlio_benchmark/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,8 @@ def LoadConfig(args, config):
args.data_loader_sampler = DataLoaderSampler(reader['data_loader_sampler'])
if 'read_threads' in reader:
args.read_threads = reader['read_threads']
if 'computatation_threads' in reader:
args.computatation_threads = reader['computatation_threads']
if 'computation_threads' in reader:
args.computation_threads = reader['computation_threads']
if 'batch_size' in reader:
args.batch_size = reader['batch_size']
if 'batch_size_eval' in reader:
Expand Down
9 changes: 3 additions & 6 deletions dlio_benchmark/utils/statscounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def end_block(self, epoch, block, steps_taken):
self.per_epoch_stats[epoch][f'block{block}']['duration'] = duration
logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Accelerator Utilization [AU] (%): {self.output[epoch]['au'][f'block{block}']:.4f}")
logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Throughput (samples/second): {self.output[epoch]['throughput'][f'block{block}']*self.comm_size:.4f}")
logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Computation time per step (second): {np.mean(self.output[epoch]['compute'][f'block{block}'][1:-1]):.4f}+/-{np.std(self.output[epoch]['compute'][f'block{block}'][1:-1]):.4f} (set value: {self.args.computation_time})")

def start_ckpt(self, epoch, block, steps_taken):
if self.my_rank == 0:
Expand Down Expand Up @@ -336,11 +337,10 @@ def compute_metrics_train(self, epoch, block):
throughput = (len(self.output[epoch]['compute'][key]) - 2)/(total_time)*self.batch_size
self.output[epoch]['au'][key] = au*100
self.output[epoch]['throughput'][key] = throughput
self.output[epoch]['compute'][key] = total_compute_time

def compute_metrics_eval(self, epoch):
key = 'eval'
total_compute_time = np.sum(self.output[epoch]['compute'][key][1:])
total_compute_time = np.sum(self.output[epoch]['compute'][key][1:-1])
if (total_compute_time==0):
au=0.0
else:
Expand All @@ -349,19 +349,16 @@ def compute_metrics_eval(self, epoch):
throughput = len(self.output[epoch]['compute'][key])/(self.end_timestamp - self.start_timestamp)*self.batch_size_eval
self.output[epoch]['au'][key] = au*100
self.output[epoch]['throughput'][key] = throughput
self.output[epoch]['compute'][key] = total_compute_time


def eval_batch_loaded(self, epoch, step):
duration = time() - self.start_time_loading
self.output[epoch]['load']['eval'].append(duration)
logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration} s")


def eval_batch_processed(self, epoch, step):
current_time = time()
duration = current_time - self.start_time_loading
computation_time = current_time -self.start_time_compute
computation_time = current_time - self.start_time_compute
self.output[epoch]['proc']['eval'].append(duration)
self.output[epoch]['compute']['eval'].append(computation_time)
logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration} s")
Expand Down

0 comments on commit 75bdee2

Please sign in to comment.