Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New improved modelling for LLM Deepspeed. #230

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
28 changes: 14 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ jobs:
source ${VENV_PATH}/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
- name: test_checkpoint_epoch
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v
rm -rf data
- name: test_checkpoint_step
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_checkpoint_step -v
- name: test_gen_data
run: |
source ${VENV_PATH}/bin/activate
Expand Down Expand Up @@ -182,20 +196,6 @@ jobs:
mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-tensorflow] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-pytorch] -v
rm -rf data
- name: test_checkpoint_epoch
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v
rm -rf data
- name: test_checkpoint_step
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_checkpoint_step -v
- name: test_eval
run: |
source ${VENV_PATH}/bin/activate
Expand Down
40 changes: 29 additions & 11 deletions dlio_benchmark/checkpointing/base_checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
limitations under the License.
"""
import os
import math
from abc import ABC, abstractmethod

from dlio_benchmark.common.enumerations import CheckpointLocationType
Expand Down Expand Up @@ -53,7 +54,7 @@ def __init__(self, ext):
self.layer_state = dict()
for index, state in enumerate(self.args.layer_parameters):
if state > 0:
self.layer_state[str(index)] = self.get_tensor(state)
self.layer_state[str(index)] = self.get_tensor(state // self.args.tensor_parallelism)

@abstractmethod
def get_tensor(self, size):
Expand All @@ -66,24 +67,41 @@ def save_state(self, suffix, state):
def get_name(self, suffix):
return os.path.join(self.args.checkpoint_folder, f"{suffix}.{self.ext}")

def get_layer_index(self, rank, tensor_parallelism, pipeline_parallelism, total_layers):
if tensor_parallelism > 1:
total_layers = total_layers + tensor_parallelism

divisible_layers = total_layers - (total_layers % pipeline_parallelism)
min_layers_per_pipeline = divisible_layers // pipeline_parallelism
max_layer_per_pipeline = min_layers_per_pipeline + 1
pipeline_rank = (rank // tensor_parallelism) % pipeline_parallelism
left_layers = total_layers - divisible_layers
num_layers_per_pipeline = max_layer_per_pipeline
if pipeline_rank >= left_layers:
num_layers_per_pipeline = min_layers_per_pipeline
if pipeline_rank < left_layers:
start_layer = pipeline_rank * max_layer_per_pipeline
end_layer = start_layer + num_layers_per_pipeline - 1
else:
start_layer = left_layers * max_layer_per_pipeline + (pipeline_rank - left_layers) * (min_layers_per_pipeline)
end_layer = start_layer + num_layers_per_pipeline - 1
return start_layer, end_layer

@abstractmethod
def checkpoint(self, epoch, step_number):
rank_to_checkpoint = DLIOMPI.get_instance().rank()
my_rank = DLIOMPI.get_instance().rank()
rank_to_checkpoint = my_rank
if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO:
rank_to_checkpoint = 0
if rank_to_checkpoint == DLIOMPI.get_instance().rank():
my_rank = DLIOMPI.get_instance().rank()
if rank_to_checkpoint == my_rank:
if self.model_state:
self.save_state(suffix=f"model-{epoch}-{step_number}-{my_rank}", state=self.model_state)
if self.optimization_state:
self.save_state(suffix=f"optimizer-{epoch}-{step_number}-{my_rank}", state=self.optimization_state)
if rank_to_checkpoint % self.args.pipeline_parallelism == 0:
if self.layer_state and self.args.num_layers > 0:
total_layers = self.args.num_layers
if self.args.tensor_parallelism > 1:
total_layers = total_layers + self.args.tensor_parallelism
for layer in range(total_layers):
self.save_state(suffix=f"layer-{layer}-{epoch}-{step_number}-{my_rank}", state=self.layer_state)

start_layer, end_layer = self.get_layer_index(my_rank,self.args.tensor_parallelism, self.args.pipeline_parallelism, self.args.num_layers)
for layer_index in range(start_layer, end_layer + 1):
self.save_state(suffix=f"layer-{layer_index}-{epoch}-{step_number}-{my_rank}", state=self.layer_state)

@abstractmethod
def finalize(self):
Expand Down
12 changes: 7 additions & 5 deletions dlio_benchmark/configs/workload/megatron_deepspeed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ dataset:

reader:
data_loader: pytorch
batch_size: 1024
batch_size: 16
read_threads: 1
file_shuffle: seed
sample_shuffle: seed

train:
epochs: 311541
computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec.
epochs: 3
computation_time: 2.44 # 2.44 sec per step

checkpoint:
checkpoint_folder: checkpoints/megatron-deepspeed
steps_between_checkpoints: 1000
model_size: 30102
type: all_ranks
optimization_groups: [1009254400, 865075200, 793600]
num_layers: 44
layer_parameters: [129761280, 20971520]
num_layers: 40
pipeline_parallelism: 8
tensor_parallelism: 4
layer_parameters: [52583936, 209715200]
44 changes: 25 additions & 19 deletions dlio_benchmark/data_generator/indexed_binary_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def generate(self):
samples_processed = 0
total_samples = self.total_files_to_generate * self.num_samples
dim = self.get_dimension(self.total_files_to_generate)
logging.info(dim)
# logging.info(dim)
if self.total_files_to_generate <= self.comm_size:
# Use collective I/O
# we need even number os samples for collective I/O
Expand All @@ -68,6 +68,30 @@ def generate(self):
out_path_spec = self.storage.get_uri(self._file_list[file_index])
out_path_spec_off_idx = self.index_file_path_off(out_path_spec)
out_path_spec_sz_idx = self.index_file_path_size(out_path_spec)

if self.my_rank == 0:
logging.info(f"{utcnow()} Starting metadata generation. ")
fh_off = MPI.File.Open(comm, out_path_spec_off_idx, amode)
fh_sz = MPI.File.Open(comm, out_path_spec_sz_idx, amode)
off_type = np.uint64
elements_per_loop = min(int(MB / np.dtype(off_type).itemsize), samples_per_rank)
offsets_processed=0
for element_index in range(self.my_rank*samples_per_rank, samples_per_rank*(self.my_rank+1), elements_per_loop):
offsets = np.array(range(self.my_rank * elements_per_loop * sample_size,
(self.my_rank + 1) * elements_per_loop * sample_size,
sample_size), dtype=off_type)

sizes = np.array([sample_size] * elements_per_loop, dtype=off_type)
offset = element_index * np.dtype(off_type).itemsize
fh_off.Write_at_all(offset, offsets)
fh_sz.Write_at_all(offset, sizes)
offsets_processed += elements_per_loop
progress(offsets_processed * self.comm_size, total_samples, "Generating Indexed Binary Data Index for Samples")
fh_off.Close()
fh_sz.Close()
if self.my_rank == 0:
logging.info(f"{utcnow()} Starting Sample generation. ")

fh = MPI.File.Open(comm, out_path_spec, amode)
samples_per_loop = int(MB * 16 / sample_size)

Expand All @@ -79,24 +103,6 @@ def generate(self):
samples_processed += samples_per_loop
progress(samples_processed * self.comm_size, total_samples, "Generating Indexed Binary Data Samples")
fh.Close()
logging.info(f"{utcnow()} rank {self.my_rank} writing metadata")
off_file = open(out_path_spec_off_idx, "wb")
sz_file = open(out_path_spec_sz_idx, "wb")
if int(file_index / self.comm_size) == self.my_rank:
# Write offsets
myfmt = 'Q' * self.num_samples
data_to_write = self.num_samples * sample_size
samples_to_write = self.num_samples
offsets = range(0, data_to_write, sample_size)
offsets = offsets[:samples_to_write]
binary_offsets = struct.pack(myfmt, *offsets)
off_file.write(binary_offsets)

# Write sizes
myfmt = 'Q' * samples_to_write
sample_sizes = [sample_size] * samples_to_write
binary_sizes = struct.pack(myfmt, *sample_sizes)
sz_file.write(binary_sizes)
else:
for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)):
dim1 = dim[2*i]
Expand Down
33 changes: 23 additions & 10 deletions dlio_benchmark/reader/indexed_binary_mmap_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class IndexedBinaryMMapReader(FormatReader):
def __init__(self, dataset_type, thread_index, epoch):
super().__init__(dataset_type, thread_index)
self.file_map_ibr = {}
self.load_index()
self.buffer_map = {}
self.load_index()

def index_file_path_off(self, prefix_path):
return prefix_path + '.off.idx'
Expand All @@ -61,6 +61,9 @@ def load_index_file(self, global_sample_idx, filename, sample_index):
bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint64))
bin_buffer_mmap = np.memmap(filename, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8)

@dlp.log
def load_index(self):
Expand All @@ -76,24 +79,20 @@ def load_index(self):

@dlp.log
def open(self, filename):
super().open(filename)
bin_buffer_mmap = np.memmap(filename, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8)
return bin_buffer_mmap
super().open(filename)
return self.buffer_map[filename]

@dlp.log
def close(self, filename):
super().close(filename)
self.open_file_map[filename]._mmap.close()


@dlp.log
def get_sample(self, filename, sample_index):
super().get_sample(filename, sample_index)
buffer = self.buffer_map[filename]
offset = self.file_map_ibr[filename][0][sample_index]
size = self.file_map_ibr[filename][1][sample_index]
logging.debug(f"reading sample from offset {offset} of size {size} from file {filename}")
image = buffer[offset:offset+size]
dlp.update(image_size=size)

Expand All @@ -103,11 +102,25 @@ def next(self):

@dlp.log
def read_index(self, image_idx, step):
return super().read_index(image_idx, step)
filename, sample_index = self.global_index_map[image_idx]
self.get_sample(filename, sample_index)
self.preprocess()
return self._args.resized_image

@dlp.log
def finalize(self):
return super().finalize()
super().finalize()
if self._args.data_loader_sampler == DataLoaderSampler.ITERATIVE:
for global_sample_idx, filename, sample_index in self.file_map[self.thread_index]:
self.buffer_map[filename]._mmap.close()
self.file_map_ibr[filename][0]._mmap.close()
self.file_map_ibr[filename][1]._mmap.close()
elif self._args.data_loader_sampler == DataLoaderSampler.INDEX:
for global_sample_idx, (filename, sample_index) in self.global_index_map.items():
self.buffer_map[filename]._mmap.close()
self.file_map_ibr[filename][0]._mmap.close()
self.file_map_ibr[filename][1]._mmap.close()


def is_index_based(self):
return True
Expand Down
19 changes: 13 additions & 6 deletions tests/dlio_benchmark_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ def test_iostat_profiling() -> None:
("pytorch", 1024, [1024, 128], 2, [16], "all_ranks"),
("tensorflow", 1024, [1024, 128], 2, [16], "rank_zero"),
("pytorch", 1024, [1024, 128], 2, [16], "rank_zero"),
("tensorflow", 1024, [128], 1, [], "all_ranks"),
("pytorch", 1024, [128], 1, [], "all_ranks")])
("tensorflow", 1024, [128], 1, [16], "all_ranks"),
("pytorch", 1024, [128], 1, [16], "all_ranks")])
def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_params, type) -> None:
init()
clean()
Expand All @@ -239,15 +239,17 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p
logging.info(f" DLIO test for checkpointing at the end of epochs")
logging.info("=" * 80)
with initialize_config_dir(version_base=None, config_dir=config_dir):
epochs = 8
epoch_per_ckp = 2
cfg = compose(config_name='config',
overrides=[f'++workload.framework={framework}',
f'++workload.reader.data_loader={framework}',
'++workload.workflow.train=True',
'++workload.workflow.generate_data=True',
'++workload.train.computation_time=0.01',
'++workload.evaluation.eval_time=0.005',
'++workload.train.epochs=8', '++workload.workflow.checkpoint=True',
'++workload.checkpoint.epochs_between_checkpoints=2',
f'++workload.train.epochs={epochs}', '++workload.workflow.checkpoint=True',
f'++workload.checkpoint.epochs_between_checkpoints={epoch_per_ckp}',
f'++workload.checkpoint.type={type}',
f'++workload.checkpoint.model_size={model_size}',
f'++workload.checkpoint.optimization_groups={optimizers}',
Expand All @@ -267,11 +269,16 @@ def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_p
nranks = 1
if type == "all_ranks":
nranks = comm.size
num_model_files = 1
num_optimizer_files = 1
num_layer_files = num_layers
files_per_checkpoint = (num_model_files + num_optimizer_files + num_layer_files) * nranks
if framework == "tensorflow":
num_check_files = 8 / 2 * (2 + 2 + 2*n) * nranks + 1
file_per_ckp = 2
num_check_files = epochs / epoch_per_ckp * files_per_checkpoint * file_per_ckp + 1
assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin} "
if framework == "pytorch":
num_check_files = 8 / 2 * (1 + 1 + n) * nranks
num_check_files = epochs / epoch_per_ckp * files_per_checkpoint
assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin}"
comm.Barrier()
if comm.rank == 0:
Expand Down
Loading