Skip to content

Commit

Permalink
For sample indexing we fix the uneven sampling (#226)
Browse files Browse the repository at this point in the history
* For sample indexing we fix the uneven sampling

1. Fix uneven sampling done for index based and iterative
2. Add a validation step to ensure we can validate that global indices are correctly shuffled and no indices are lost.
3. Make sure we do file and sample shuffling in reconfigure step.
4. Remove sample shuffling from dataloader Sampler code.
5. Added test case to support uneven file distributions #225

* Increase GOTCHA and DFTRACER LEVEL to reduce print

* Increasing test case to improve testing

* reduced reader threads asnot enough data

* ensure the sampler do not goes past the file in the last rank.
  • Loading branch information
hariharan-devarajan authored Sep 24, 2024
1 parent bbe9bae commit 3732663
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 123 deletions.
94 changes: 62 additions & 32 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,65 @@ jobs:
- name: test_train
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-False] -v
rm -rf data
- name: test_custom_storage_root_train
run: |
Expand Down Expand Up @@ -227,9 +257,9 @@ jobs:
run: |
source ${VENV_PATH}/bin/activate
rm -rf output data checkpoints
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_a100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.reader.read_threads=1 ++workload.dataset.format=synthetic
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_a100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1
mpirun -np 2 ${DLIO_EXEC} workload=resnet50_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=8 ++workload.reader.read_threads=1 ++workload.dataset.format=synthetic
rm -rf data
- name: test_cosmoflow
run: |
Expand Down
31 changes: 9 additions & 22 deletions dlio_benchmark/data_loader/dali_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class DaliIndexDataset(object):

def __init__(self, format_type, dataset_type, epoch, worker_index,
total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234):
total_num_workers, total_num_samples, samples_per_worker, batch_size):
self.format_type = format_type
self.dataset_type = dataset_type
self.epoch = epoch
Expand All @@ -44,22 +44,19 @@ def __init__(self, format_type, dataset_type, epoch, worker_index,
self.samples_per_worker = samples_per_worker
self.batch_size = batch_size
self.worker_index = worker_index
self.shuffle = shuffle
self.total_num_steps = self.samples_per_worker//batch_size
self.reader = ReaderFactory.get_reader(type=self.format_type,
dataset_type=self.dataset_type,
thread_index=worker_index,
epoch_number=self.epoch)
assert(self.reader.is_index_based())
self.seed = seed
start_sample = self.worker_index * samples_per_worker
end_sample = (self.worker_index + 1) * samples_per_worker
end_sample = (self.worker_index + 1) * samples_per_worker - 1
if end_sample > total_num_samples - 1:
end_sample = total_num_samples - 1
if not hasattr(self, 'indices'):
self.indices = list(range(start_sample, end_sample))
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
self.indices = list(range(start_sample, end_sample + 1))
self.samples_per_worker = len(self.indices)
def __call__(self, sample_info):
logging.debug(
f"{utcnow()} Reading {sample_info.idx_in_epoch} out of {self.samples_per_worker} by worker {self.worker_index} with {self.indices} indices")
Expand All @@ -74,7 +71,7 @@ def __call__(self, sample_info):

class DaliIteratorDataset(object):
def __init__(self, format_type, dataset_type, epoch, worker_index,
total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234):
total_num_workers, total_num_samples, samples_per_worker, batch_size):
self.format_type = format_type
self.dataset_type = dataset_type
self.epoch = epoch
Expand All @@ -83,22 +80,12 @@ def __init__(self, format_type, dataset_type, epoch, worker_index,
self.samples_per_worker = samples_per_worker
self.batch_size = batch_size
self.worker_index = worker_index
self.shuffle = shuffle
self.total_num_steps = self.samples_per_worker//batch_size
self.reader = ReaderFactory.get_reader(type=self.format_type,
dataset_type=self.dataset_type,
thread_index=worker_index,
epoch_number=self.epoch)
assert(self.reader.is_iterator_based())
self.seed = seed
start_sample = self.worker_index * samples_per_worker
end_sample = (self.worker_index + 1) * samples_per_worker
if not hasattr(self, 'indices'):
self.indices = list(range(start_sample, end_sample))
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
def __iter__(self):
with Profile(MODULE_DATA_LOADER):
for image in self.reader.next():
Expand All @@ -124,12 +111,12 @@ def read(self, init=False):
if self._args.prefetch_size > 0:
prefetch_size = self._args.prefetch_size
num_pipelines = 1
samples_per_worker = self.num_samples // num_pipelines // self._args.comm_size
samples_per_worker = int(math.ceil(self.num_samples/num_pipelines/self._args.comm_size))
for worker_index in range(num_pipelines):
global_worker_index = self._args.my_rank * num_pipelines + worker_index
# None executes pipeline on CPU and the reader does the batching
self.dataset = DaliIndexDataset(self.format_type, self.dataset_type, self.epoch_number, global_worker_index,
self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1, self._args.sample_shuffle, self._args.seed)
self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1)
pipeline = Pipeline(batch_size=self.batch_size, num_threads=num_threads, device_id=None, py_num_workers=num_threads//num_pipelines,
prefetch_queue_depth=prefetch_size, py_start_method=self._args.multiprocessing_context, exec_async=True)
with pipeline:
Expand Down
17 changes: 6 additions & 11 deletions dlio_benchmark/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,23 @@ def __getitem__(self, image_idx):


class dlio_sampler(Sampler):
def __init__(self, rank, size, num_samples, shuffle, epochs, seed):
def __init__(self, rank, size, num_samples, epochs):
self.size = size
self.rank = rank
self.num_samples = num_samples
self.shuffle = shuffle
self.epochs = epochs
self.seed = seed
samples_per_proc = int(math.ceil(num_samples/size))
start_sample = self.rank * samples_per_proc
end_sample = (self.rank + 1) * samples_per_proc
self.indices = list(range(start_sample, end_sample))
end_sample = (self.rank + 1) * samples_per_proc - 1
if end_sample > num_samples - 1:
end_sample = num_samples - 1
self.indices = list(range(start_sample, end_sample + 1))


def __len__(self):
return self.num_samples

def __iter__(self):
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
for sample in self.indices:
yield sample

Expand All @@ -118,8 +114,7 @@ def __init__(self, format_type, dataset_type, epoch_number):
def read(self):
dataset = TorchDataset(self.format_type, self.dataset_type, self.epoch_number, self.num_samples,
self._args.read_threads, self.batch_size)
sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.sample_shuffle,
self._args.epochs, self._args.seed)
sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.epochs)
if self._args.read_threads >= 1:
prefetch_factor = math.ceil(self._args.prefetch_size / self._args.read_threads)
else:
Expand Down
Loading

0 comments on commit 3732663

Please sign in to comment.