Skip to content

Commit

Permalink
removed AU expectation; and updated throughput calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghh04 committed May 29, 2024
2 parents e1ed0ab + 2cc6b1d commit 803b51a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
4 changes: 3 additions & 1 deletion dlio_benchmark/reader/tf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,23 @@ def next(self):
f"{utcnow()} Reading {len(self._file_list)} files thread {self.thread_index} rank {self._args.my_rank}")
self._dataset = tf.data.TFRecordDataset(filenames=self._file_list, buffer_size=self._args.transfer_size,
num_parallel_reads=self._args.read_threads)
self._dataset = self._dataset.shard(num_shards=self._args.comm_size, index=self._args.my_rank)

if self._args.sample_shuffle != Shuffle.OFF:
if self._args.sample_shuffle == Shuffle.SEED:
self._dataset = self._dataset.shuffle(buffer_size=self._args.shuffle_size,
seed=self._args.seed)
else:
self._dataset = self._dataset.shuffle(buffer_size=self._args.shuffle_size)

self._dataset = self._dataset.shard(num_shards=self._args.comm_size, index=self._args.my_rank)
self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True)
self._dataset = self._dataset.map(
lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]),
num_parallel_calls=self._args.computation_threads)
self._dataset = self._dataset.repeat(self._args.epochs)
total = math.ceil(len(self._file_list)/self._args.comm_size / self.batch_size * self._args.num_samples_per_file)
return self._dataset.take(total*self._args.epochs).prefetch(buffer_size=self._args.prefetch_size)

@dlp.log
def read_index(self, image_idx, step):
return super().read_index(image_idx, step)
Expand Down
16 changes: 3 additions & 13 deletions dlio_benchmark/utils/statscounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ def end_run(self):
self.summary['epochs'] = len(train_au)
self.summary['metric']['train_au_percentage'] = list(train_au)
self.summary['metric']['train_au_mean_percentage'] = np.mean(train_au)
if self.summary['metric']['train_au_mean_percentage'] >=90:
self.summary['metric']['train_au_meet_expectation'] = 'success'
else:
self.summary['metric']['train_au_meet_expectation'] = 'fail'
self.summary['metric']['train_au_stdev_percentage'] = np.std(train_au)
self.summary['metric']['train_throughput_samples_per_second'] = list(train_throughput)
self.summary['metric']['train_throughput_mean_samples_per_second'] = np.mean(train_throughput)
Expand All @@ -157,10 +153,6 @@ def end_run(self):
eval_throughput = self.comm.allreduce(self.eval_throughput)
self.summary['metric']['eval_au_percentage'] = list(eval_au)
self.summary['metric']['eval_au_mean_percentage'] = np.mean(eval_au)
if self.summary['metric']['eval_au_mean_percentage'] >=90:
self.summary['metric']['eval_au_meet_expectation'] = 'success'
else:
self.summary['metric']['eval_au_meet_expectation'] = 'fail'
self.summary['metric']['eval_au_stdev_percentage'] = np.std(eval_au)
self.summary['metric']['eval_throughput_samples_per_second'] = list(eval_throughput)
self.summary['metric']['eval_throughput_mean_samples_per_second'] = np.mean(eval_throughput)
Expand All @@ -174,13 +166,11 @@ def end_run(self):
metric = metric + f"[METRIC] Training Accelerator Utilization [AU] (%): {np.mean(train_au):.4f} ({np.std(train_au):.4f})\n"
metric = metric + f"[METRIC] Training Throughput (samples/second): {np.mean(train_throughput):.4f} ({np.std(train_throughput):.4f})\n"
metric = metric + f"[METRIC] Training I/O Throughput (MB/second): {np.mean(train_throughput)*self.record_size/1024/1024:.4f} ({np.std(train_throughput)*self.record_size/1024/1024:.4f})\n"
metric = metric + f"[METRIC] train_au_meet_expectation: {self.summary['metric']['train_au_meet_expectation']}\n"

if self.args.do_eval:
metric = metric + f"[METRIC] Eval Accelerator Utilization [AU] (%): {np.mean(eval_au):.4f} ({np.std(eval_au):.4f})\n"
metric = metric + f"[METRIC] Eval Throughput (samples/second): {np.mean(eval_throughput):.6f} ({np.std(eval_throughput):.6f})\n"
metric = metric + f"[METRIC] Eval Throughput (MB/second): {np.mean(eval_throughput)*self.record_size/1024/1024:.6f} ({np.std(eval_throughput)*self.record_size/1024/1024:.6f})\n"
metric = metric + f"[METRIC] eval_au_meet_expectation: {self.summary['metric']['eval_au_meet_expectation']}\n"
metric+="[METRIC] ==========================================================\n"
logging.info(metric)
def start_train(self, epoch):
Expand Down Expand Up @@ -331,13 +321,13 @@ def batch_processed(self, epoch, step, block):

def compute_metrics_train(self, epoch, block):
key = f"block{block}"
total_compute_time = np.sum(self.output[epoch]['compute'][key][1:])
total_compute_time = np.sum(self.output[epoch]['compute'][key][1:-1])
if (total_compute_time==0):
au=0.0
else:
total_time = self.end_timestamp - self.start_timestamp - self.output[epoch]['proc'][key][0]
total_time = self.end_timestamp - self.start_timestamp - self.output[epoch]['proc'][key][0] - self.output[epoch]['proc'][key][-1]
au = total_compute_time / total_time
throughput = len(self.output[epoch]['compute'][key])/(self.end_timestamp - self.start_timestamp)*self.batch_size
throughput = (len(self.output[epoch]['compute'][key]) - 2)/(total_time)*self.batch_size
self.output[epoch]['au'][key] = au*100
self.output[epoch]['throughput'][key] = throughput

Expand Down

0 comments on commit 803b51a

Please sign in to comment.