diff --git a/dlio_benchmark/reader/tf_reader.py b/dlio_benchmark/reader/tf_reader.py index 61eb8c10..493cde2d 100644 --- a/dlio_benchmark/reader/tf_reader.py +++ b/dlio_benchmark/reader/tf_reader.py @@ -67,7 +67,7 @@ def _parse_image(self, serialized): 'image': tf.io.FixedLenFeature([], tf.string), 'size': tf.io.FixedLenFeature([], tf.int64) } - parsed_example = tf.io.parse_example(serialized=serialized, features=features) + #parsed_example = tf.io.parse_example(serialized=serialized, features=features) # Get the image as raw bytes. #image_raw = parsed_example['image'] #dimension = tf.cast(parsed_example['size'], tf.int32).numpy() @@ -94,10 +94,20 @@ def next(self): self._dataset = self._dataset.shuffle(buffer_size=self._args.shuffle_size) self._dataset = self._dataset.shard(num_shards=self._args.comm_size, index=self._args.my_rank) - self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True) - self._dataset = self._dataset.map( - lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]), - num_parallel_calls=self._args.computation_threads) + if self._args.computation_threads==0: + self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True) + else: + if self._args.computation_threads <= self.batch_size: + self._dataset = self._dataset.batch(self.batch_size, drop_remainder=True) + self._dataset = self._dataset.map( + lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]), + num_parallel_calls=self._args.computation_threads) + else: + self._dataset = self._dataset.batch(self._args.computation_threads) + self._dataset = self._dataset.map( + lambda x: tf.py_function(func=self._parse_image, inp=[x], Tout=[tf.uint8]), + num_parallel_calls=self._args.computation_threads) + self._dataset = self._dataset.unbatch(self.batch_size) self._dataset = self._dataset.repeat(self._args.epochs) total = math.ceil(len(self._file_list)/self._args.comm_size / self.batch_size * self._args.num_samples_per_file) return self._dataset.take(total*self._args.epochs).prefetch(buffer_size=self._args.prefetch_size) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index d2093754..d0929a91 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -461,8 +461,8 @@ def LoadConfig(args, config): args.data_loader_sampler = DataLoaderSampler(reader['data_loader_sampler']) if 'read_threads' in reader: args.read_threads = reader['read_threads'] - if 'computatation_threads' in reader: - args.computatation_threads = reader['computatation_threads'] + if 'computation_threads' in reader: + args.computation_threads = reader['computation_threads'] if 'batch_size' in reader: args.batch_size = reader['batch_size'] if 'batch_size_eval' in reader: diff --git a/dlio_benchmark/utils/statscounter.py b/dlio_benchmark/utils/statscounter.py index 84caff51..97ceea98 100644 --- a/dlio_benchmark/utils/statscounter.py +++ b/dlio_benchmark/utils/statscounter.py @@ -280,6 +280,7 @@ def end_block(self, epoch, block, steps_taken): self.per_epoch_stats[epoch][f'block{block}']['duration'] = duration logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Accelerator Utilization [AU] (%): {self.output[epoch]['au'][f'block{block}']:.4f}") logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Throughput (samples/second): {self.output[epoch]['throughput'][f'block{block}']*self.comm_size:.4f}") + logging.info(f"{utcnow()} Epoch {epoch} - Block {block} [Training] Computation time per step (second): {np.mean(self.output[epoch]['compute'][f'block{block}'][1:-1]):.4f}+/-{np.std(self.output[epoch]['compute'][f'block{block}'][1:-1]):.4f} (set value: {self.args.computation_time})") def start_ckpt(self, epoch, block, steps_taken): if self.my_rank == 0: @@ -336,11 +337,10 @@ def compute_metrics_train(self, epoch, block): throughput = (len(self.output[epoch]['compute'][key]) - 2)/(total_time)*self.batch_size self.output[epoch]['au'][key] = au*100 self.output[epoch]['throughput'][key] = throughput - self.output[epoch]['compute'][key] = total_compute_time def compute_metrics_eval(self, epoch): key = 'eval' - total_compute_time = np.sum(self.output[epoch]['compute'][key][1:]) + total_compute_time = np.sum(self.output[epoch]['compute'][key][1:-1]) if (total_compute_time==0): au=0.0 else: @@ -349,19 +349,16 @@ def compute_metrics_eval(self, epoch): throughput = len(self.output[epoch]['compute'][key])/(self.end_timestamp - self.start_timestamp)*self.batch_size_eval self.output[epoch]['au'][key] = au*100 self.output[epoch]['throughput'][key] = throughput - self.output[epoch]['compute'][key] = total_compute_time - def eval_batch_loaded(self, epoch, step): duration = time() - self.start_time_loading self.output[epoch]['load']['eval'].append(duration) logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration} s") - def eval_batch_processed(self, epoch, step): current_time = time() duration = current_time - self.start_time_loading - computation_time = current_time -self.start_time_compute + computation_time = current_time - self.start_time_compute self.output[epoch]['proc']['eval'].append(duration) self.output[epoch]['compute']['eval'].append(computation_time) logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration} s")