Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Native Dali Data Loader support for TFRecord, Images, and NPZ files #118

Merged
merged 39 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
00a253d
fixed readthedoc build issue
zhenghh04 Nov 22, 2023
256cecf
partial merged the following PR: https://github.com/argonne-lcf/dlio_…
zhenghh04 Dec 1, 2023
63cb1c1
added back npz_reader
zhenghh04 Dec 1, 2023
cdd7389
fixed bugs
zhenghh04 Dec 1, 2023
4f0008e
fixed bugs
zhenghh04 Dec 1, 2023
fbc9748
fixed image reader issue
zhenghh04 Dec 1, 2023
9bde93c
fixed Profile, PerfTrace
zhenghh04 Dec 6, 2023
c976bc5
removed unnecessary logs
zhenghh04 Dec 6, 2023
58febf3
fixed dali_image_reader
zhenghh04 Dec 6, 2023
3cee0fd
fixed dali_image_reader
zhenghh04 Dec 6, 2023
a666d5c
added support for npy format
zhenghh04 Dec 6, 2023
f0722b6
added support for npy format
zhenghh04 Dec 6, 2023
7155d5d
changed enumerations
zhenghh04 Dec 8, 2023
dcd9855
Merge branch 'dali' of github.com:argonne-lcf/dlio_benchmark into dali
zhenghh04 Dec 8, 2023
9935840
added removed dali base reader
zhenghh04 Dec 11, 2023
5dc3907
fixed a bug
zhenghh04 Dec 11, 2023
3fb3602
added native-dali-loader tests in github action
zhenghh04 Dec 11, 2023
248cfa2
corrected github action formats
zhenghh04 Dec 11, 2023
d2af6a3
Merge branch 'main' into dali
zhenghh04 Dec 11, 2023
344298d
fixed read return
zhenghh04 Dec 11, 2023
d983e6b
Merge branch 'dali' of github.com:argonne-lcf/dlio_benchmark into dali
zhenghh04 Dec 11, 2023
ac025eb
removed abstractmethod
zhenghh04 Dec 11, 2023
d2d544f
fixed bugs
zhenghh04 Dec 11, 2023
7312c01
added dont_use_mmap
zhenghh04 Dec 11, 2023
02c3855
fixed indent
zhenghh04 Dec 11, 2023
60c508c
fixed csvreader
zhenghh04 Dec 11, 2023
5e96841
native_dali test with npy format instead of npz
zhenghh04 Dec 11, 2023
b1412e3
fixed issue of enum
zhenghh04 Dec 11, 2023
d659e5f
modify action so that dlio will always be installed
zhenghh04 Dec 11, 2023
96fa9c3
[skip ci] added documentation for dali
zhenghh04 Dec 11, 2023
f9aaac2
removed read; and define it as pipeline
zhenghh04 Dec 12, 2023
ca760fe
added exceptions for unimplemented methods
zhenghh04 Dec 12, 2023
a8ba464
added preprocessing
zhenghh04 Dec 14, 2023
1f03159
conditional cache for DLIO installation
zhenghh04 Dec 14, 2023
5dd6ebf
fixed bugs
zhenghh04 Dec 14, 2023
6af315b
fixed bugs
zhenghh04 Dec 14, 2023
8dc2b71
fixed bugs
zhenghh04 Dec 14, 2023
7e72ed5
fixing again
zhenghh04 Dec 14, 2023
4e727a8
tests again
zhenghh04 Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ sphinx:
# Optional but recommended, declare the Python requirements required
# to build your documentation
# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
# python:
# install:
# - requirements: docs/requirements.txt
python:
install:
- requirements: docs/requirements.txt
4 changes: 4 additions & 0 deletions dlio_benchmark/common/enumerations.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class FormatType(Enum):
HDF5 = 'hdf5'
CSV = 'csv'
NPZ = 'npz'
NPY = 'npy'
HDF5_OPT = 'hdf5_opt'
JPEG = 'jpeg'
PNG = 'png'
Expand All @@ -110,6 +111,8 @@ def get_enum(value):
return FormatType.CSV
elif FormatType.NPZ.value == value:
return FormatType.NPZ
elif FormatType.NPY.value == value:
return FormatType.NPY
elif FormatType.HDF5_OPT.value == value:
return FormatType.HDF5_OPT
elif FormatType.JPEG.value == value:
Expand All @@ -124,6 +127,7 @@ class DataLoaderType(Enum):
TENSORFLOW='tensorflow'
PYTORCH='pytorch'
DALI='dali'
NATIVE_DALI = 'native_dali'
CUSTOM='custom'
NONE='none'

Expand Down
3 changes: 3 additions & 0 deletions dlio_benchmark/data_generator/generator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def get_generator(type):
elif type == FormatType.NPZ:
from dlio_benchmark.data_generator.npz_generator import NPZGenerator
return NPZGenerator()
elif type == FormatType.NPY:
from dlio_benchmark.data_generator.npy_generator import NPYGenerator
return NPYGenerator()
elif type == FormatType.JPEG:
from dlio_benchmark.data_generator.jpeg_generator import JPEGGenerator
return JPEGGenerator()
Expand Down
15 changes: 14 additions & 1 deletion dlio_benchmark/data_generator/tf_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
from subprocess import call

from dlio_benchmark.data_generator.data_generator import DataGenerator
import numpy as np
import tensorflow as tf
from dlio_benchmark.utils.utility import progress, utcnow
from dlio_profiler.logger import fn_interceptor as Profile

from dlio_benchmark.utils.utility import progress, utcnow
from shutil import copyfile
from dlio_benchmark.common.constants import MODULE_DATA_GENERATOR

Expand Down Expand Up @@ -64,4 +67,14 @@ def generate(self):
serialized = example.SerializeToString()
# Write the serialized data to the TFRecords file.
writer.write(serialized)
tfrecord2idx_script = "tfrecord2idx"
folder = "train"
if "valid" in out_path_spec:
folder = "valid"
index_folder = f"{self._args.data_folder}/index/{folder}"
filename = os.path.basename(out_path_spec)
self.storage.create_node(index_folder, exist_ok=True)
tfrecord_idx = f"{index_folder}/{filename}.idx"
if not os.path.isfile(tfrecord_idx):
call([tfrecord2idx_script, out_path_spec, tfrecord_idx])
np.random.seed()
3 changes: 3 additions & 0 deletions dlio_benchmark/data_loader/data_loader_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def get_loader(type, format_type, dataset_type, epoch):
elif type == DataLoaderType.DALI:
from dlio_benchmark.data_loader.dali_data_loader import DaliDataLoader
return DaliDataLoader(format_type, dataset_type, epoch)
elif type == DataLoaderType.NATIVE_DALI:
from dlio_benchmark.data_loader.native_dali_data_loader import NativeDaliDataLoader
return NativeDaliDataLoader(format_type, dataset_type, epoch)
else:
print("Data Loader %s not supported or plugins not found" % type)
raise Exception(str(ErrorCodes.EC1004))
60 changes: 60 additions & 0 deletions dlio_benchmark/data_loader/native_dali_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from time import time
import logging
import math
import numpy as np
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali as dali
from nvidia.dali.plugin.pytorch import DALIGenericIterator

from dlio_benchmark.common.constants import MODULE_DATA_LOADER
from dlio_benchmark.common.enumerations import Shuffle, DataLoaderType, DatasetType
from dlio_benchmark.data_loader.base_data_loader import BaseDataLoader
from dlio_benchmark.reader.reader_factory import ReaderFactory
from dlio_benchmark.utils.utility import utcnow, get_rank, timeit
from dlio_profiler.logger import dlio_logger as PerfTrace, fn_interceptor as Profile

dlp = Profile(MODULE_DATA_LOADER)


class NativeDaliDataLoader(BaseDataLoader):
@dlp.log_init
def __init__(self, format_type, dataset_type, epoch):
super().__init__(format_type, dataset_type, epoch, DataLoaderType.NATIVE_DALI)
self.pipelines = []

@dlp.log
def read(self):
num_samples = self._args.total_samples_train if self.dataset_type is DatasetType.TRAIN else self._args.total_samples_eval
batch_size = self._args.batch_size if self.dataset_type is DatasetType.TRAIN else self._args.batch_size_eval
parallel = True if self._args.read_threads > 0 else False
self.pipelines = []
num_threads = 1
if self._args.read_threads > 0:
num_threads = self._args.read_threads
# None executes pipeline on CPU and the reader does the batching
pipeline = Pipeline(batch_size=batch_size, num_threads=num_threads, device_id=None, py_num_workers=num_threads,
exec_async=False, exec_pipelined=False)
with pipeline:
images = ReaderFactory.get_reader(type=self.format_type,
dataset_type=self.dataset_type,
thread_index=-1,
epoch_number=self.epoch_number).read()
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
pipeline.set_outputs(images)
self.pipelines.append(pipeline)

@dlp.log
def next(self):
super().next()
num_samples = self._args.total_samples_train if self.dataset_type is DatasetType.TRAIN else self._args.total_samples_eval
batch_size = self._args.batch_size if self.dataset_type is DatasetType.TRAIN else self._args.batch_size_eval
for step in range(num_samples // batch_size):
_dataset = DALIGenericIterator(self.pipelines, ['data'])
for batch in _dataset:
logging.debug(f"{utcnow()} Creating {len(batch)} batches by {self._args.my_rank} rank ")
yield batch

@dlp.log
def finalize(self):
pass
65 changes: 65 additions & 0 deletions dlio_benchmark/reader/dali_base_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Copyright (c) 2022, UChicago Argonne, LLC
zhenghh04 marked this conversation as resolved.
Show resolved Hide resolved
All Rights Reserved
zhenghh04 marked this conversation as resolved.
Show resolved Hide resolved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from time import sleep

import numpy as np
import nvidia
from abc import ABC, abstractmethod
from nvidia import dali

from dlio_benchmark.common.constants import MODULE_DATA_READER
from dlio_benchmark.common.enumerations import DatasetType
from dlio_benchmark.utils.config import ConfigArguments
from dlio_profiler.logger import dlio_logger as PerfTrace, fn_interceptor as Profile

from nvidia.dali import fn
dlp = Profile(MODULE_DATA_READER)

class DaliBaseReader(ABC):

@dlp.log_init
def __init__(self, dataset_type):
self.dataset_type = dataset_type
self._args = ConfigArguments.get_instance()
self.batch_size = self._args.batch_size if self.dataset_type is DatasetType.TRAIN else self._args.batch_size_eval
self.file_list = self._args.file_list_train if self.dataset_type is DatasetType.TRAIN else self._args.file_list_eval

@dlp.log
def _preprocess(self, dataset):
if self._args.preprocess_time != 0. or self._args.preprocess_time_stdev != 0.:
t = np.random.normal(self._args.preprocess_time, self._args.preprocess_time_stdev)
sleep(max(t, 0.0))
return dataset

@dlp.log
def _resize(self, dataset):
return nvidia.dali.fn.reshape(dataset, shape=[self._args.max_dimension, self._args.max_dimension])

@abstractmethod
def _load(self):
pass

@dlp.log
def read(self):
dataset = self._load()
#dataset = self._resize(dataset)
#dataset = nvidia.dali.fn.python_function(dataset, function= self._preprocess, num_outputs=1)
return dataset

@abstractmethod
def finalize(self):
pass
69 changes: 69 additions & 0 deletions dlio_benchmark/reader/dali_image_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Copyright (c) 2022, UChicago Argonne, LLC
All Rights Reserved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import math
import logging
from time import time

import nvidia.dali.fn as fn
from dlio_benchmark.common.constants import MODULE_DATA_READER
from dlio_benchmark.reader.dali_base_reader import DaliBaseReader
from dlio_benchmark.utils.utility import utcnow
from dlio_benchmark.common.enumerations import DatasetType, Shuffle
import nvidia.dali.tfrecord as tfrec
from dlio_profiler.logger import dlio_logger as PerfTrace, fn_interceptor as Profile

dlp = Profile(MODULE_DATA_READER)


class DaliImageReader(DaliBaseReader):
@dlp.log_init
def __init__(self, dataset_type):
super().__init__(dataset_type)

@dlp.log
def _load(self):
logging.debug(
zhenghh04 marked this conversation as resolved.
Show resolved Hide resolved
f"{utcnow()} Reading {len(self.file_list)} files rank {self._args.my_rank}")
random_shuffle = False
seed = -1
seed_change_epoch = False
if self._args.sample_shuffle is not Shuffle.OFF:
if self._args.sample_shuffle is not Shuffle.SEED:
seed = self._args.seed
random_shuffle = True
seed_change_epoch = True
initial_fill = 1024
if self._args.shuffle_size > 0:
initial_fill = self._args.shuffle_size
prefetch_size = 1
if self._args.prefetch_size > 0:
prefetch_size = self._args.prefetch_size

stick_to_shard = True
if seed_change_epoch:
stick_to_shard = False
images, labels = fn.readers.file(files=self.file_list, num_shards=self._args.comm_size,
prefetch_queue_depth=prefetch_size,
initial_fill=initial_fill, random_shuffle=random_shuffle,
shuffle_after_epoch=seed_change_epoch,
stick_to_shard=stick_to_shard, pad_last_batch=True)
dataset = fn.decoders.image(images, device='cpu')
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
return dataset

@dlp.log
def finalize(self):
pass
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
68 changes: 68 additions & 0 deletions dlio_benchmark/reader/dali_npy_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Copyright (c) 2022, UChicago Argonne, LLC
All Rights Reserved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import math
import logging
from time import time

import nvidia.dali.fn as fn
from dlio_benchmark.common.constants import MODULE_DATA_READER
from dlio_benchmark.reader.dali_base_reader import DaliBaseReader
from dlio_benchmark.utils.utility import utcnow
from dlio_benchmark.common.enumerations import DatasetType, Shuffle
import nvidia.dali.tfrecord as tfrec
from dlio_profiler.logger import dlio_logger as PerfTrace, fn_interceptor as Profile

dlp = Profile(MODULE_DATA_READER)


class DaliNPYReader(DaliBaseReader):
@dlp.log_init
def __init__(self, dataset_type):
super().__init__(dataset_type)

@dlp.log
def _load(self):
logging.debug(
zhenghh04 marked this conversation as resolved.
Show resolved Hide resolved
f"{utcnow()} Reading {len(self.file_list)} files rank {self._args.my_rank}")
random_shuffle = False
seed = -1
seed_change_epoch = False
if self._args.sample_shuffle is not Shuffle.OFF:
if self._args.sample_shuffle is not Shuffle.SEED:
seed = self._args.seed
random_shuffle = True
seed_change_epoch = True
initial_fill = 1024
if self._args.shuffle_size > 0:
initial_fill = self._args.shuffle_size
prefetch_size = 1
if self._args.prefetch_size > 0:
prefetch_size = self._args.prefetch_size

stick_to_shard = True
if seed_change_epoch:
stick_to_shard = False

dataset = fn.readers.numpy(device='cpu', files=self.file_list, num_shards=self._args.comm_size,
prefetch_queue_depth=prefetch_size, initial_fill=initial_fill,
random_shuffle=random_shuffle, seed=seed, shuffle_after_epoch=seed_change_epoch,
stick_to_shard=stick_to_shard, pad_last_batch=True)
return dataset

@dlp.log
def finalize(self):
pass
Loading