Skip to content

Commit

Permalink
added checkpointing to support LLMs (#114)
Browse files Browse the repository at this point in the history
* added checkpointing to support LLMs

* added indexed binary data support for LLMs.

* added configuration for megatron deepspeed.

* fixes for out of core data generation

* fixes for out of core data generation

* fixes for out of core data generation

* added dlrm configuration

* added changes to support mmapped file.

* added changes to support mmapped file.

* added changes to support mmapped file.

* added changes to support mmapped file.

* added changes to support mmapped file.

* fixed checkpointing for tensors

* Update torch_framework.py

Fix rank for merge bug.

* Update indexed_binary_generator.py

Change GB to a abs value.

* Update megatron_deepspeed.yaml

* refactor enum for better naming

* documentation for the checkpointing.

* make data generation buffer_size configurable.

* Update tf_framework.py

Args model size

* Update tf_framework.py

* Update megatron_deepspeed.yaml

* Update megatron_deepspeed.yaml

* make data generation buffer_size configurable.
  • Loading branch information
hariharan-devarajan authored Jan 9, 2024
1 parent 0720984 commit 0a6130a
Show file tree
Hide file tree
Showing 17 changed files with 647 additions and 74 deletions.
23 changes: 22 additions & 1 deletion .github/workflows/python-package-conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ jobs:
mpirun -np 2 pytest -k test_gen_data[jpeg-tensorflow] -v
mpirun -np 2 pytest -k test_gen_data[tfrecord-tensorflow] -v
mpirun -np 2 pytest -k test_gen_data[hdf5-tensorflow] -v
mpirun -np 2 pytest -k test_gen_data[indexed_binary-tensorflow] -v
mpirun -np 2 pytest -k test_gen_data[mmap_indexed_binary-tensorflow] -v
- name: test_custom_storage_root_gen_data
run: |
source ${VENV}/bin/activate
Expand All @@ -89,6 +91,8 @@ jobs:
mpirun -np 2 pytest -k test_storage_root_gen_data[jpeg-tensorflow] -v
mpirun -np 2 pytest -k test_storage_root_gen_data[tfrecord-tensorflow] -v
mpirun -np 2 pytest -k test_storage_root_gen_data[hdf5-tensorflow] -v
mpirun -np 2 pytest -k test_storage_root_gen_data[indexed_binary-tensorflow] -v
mpirun -np 2 pytest -k test_storage_root_gen_data[mmap_indexed_binary-tensorflow] -v
- name: test_train
run: |
source ${VENV}/bin/activate
Expand All @@ -113,6 +117,14 @@ jobs:
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali] -v
- name: test_custom_storage_root_train
run: |
source ${VENV}/bin/activate
Expand All @@ -127,10 +139,19 @@ jobs:
mpirun -np 2 pytest -k test_custom_storage_root_train[jpeg-pytorch] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[hdf5-pytorch] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[csv-pytorch] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[indexed_binary-tensorflow] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[indexed_binary-pytorch] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-tensorflow] -v
mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-pytorch] -v
- name: test_checkpoint_epoch
run: |
source ${VENV}/bin/activate
mpirun -np 2 pytest -k test_checkpoint_epoch -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v
mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v
- name: test_checkpoint_step
run: |
source ${VENV}/bin/activate
Expand Down
16 changes: 16 additions & 0 deletions dlio_benchmark/common/enumerations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@

from enum import Enum

class CheckpointLocationType(Enum):
"""
Different types of underlying storage
"""
RANK_ZERO = 'rank_zero'
ALL_RANKS = 'all_ranks'

def __str__(self):
return self.value

class StorageType(Enum):
"""
Different types of underlying storage
Expand Down Expand Up @@ -97,6 +107,8 @@ class FormatType(Enum):
HDF5_OPT = 'hdf5_opt'
JPEG = 'jpeg'
PNG = 'png'
INDEXED_BINARY = 'indexed_binary'
MMAP_INDEXED_BINARY = 'mmap_indexed_binary'

def __str__(self):
return self.value
Expand All @@ -119,6 +131,10 @@ def get_enum(value):
return FormatType.JPEG
elif FormatType.PNG.value == value:
return FormatType.PNG
elif FormatType.INDEXED_BINARY.value == value:
return FormatType.INDEXED_BINARY
elif FormatType.MMAP_INDEXED_BINARY.value == value:
return FormatType.MMAP_INDEXED_BINARY

class DataLoaderType(Enum):
"""
Expand Down
34 changes: 34 additions & 0 deletions dlio_benchmark/configs/workload/dlrm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
model: dlrm

framework: pytorch

workflow:
generate_data: False
train: True
do_eval: True

dataset:
data_folder: data/dlrm
format: indexed_binary
num_files_train: 1
num_files_eval: 1
num_samples_per_file: 4195198976
record_length: 327680
keep_files: True
eval_num_samples_per_file: 91681240

reader:
data_loader: pytorch
batch_size: 2048
batch_size_eval: 16384
sample_shuffle: random

train:
epochs: 1
computation_time: 0.064296
total_training_steps: 32768
total_eval_steps: 2048

evaluation:
eval_time: 0.0843
steps_between_evals: 16384
36 changes: 36 additions & 0 deletions dlio_benchmark/configs/workload/megatron_deepspeed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# 8 node run with 4 GPUs per node and TPSIZE=4 and PPSIZE=8
model: megatron_deepspeed

framework: pytorch

workflow:
generate_data: False
train: True
checkpoint: True

dataset:
data_folder: dataset/megatron-deepspeed/
format: mmap_indexed_binary
num_files_train: 1
num_samples_per_file: 277203535
record_length: 2048

reader:
data_loader: pytorch
batch_size: 1024
read_threads: 1
file_shuffle: seed
sample_shuffle: seed

train:
epochs: 311541
computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec.

checkpoint:
checkpoint_folder: checkpoints/megatron-deepspeed
steps_between_checkpoints: 1000
model_size: 30102
type: all_ranks
optimization_groups: [1009254400, 865075200, 793600]
num_layers: 44
layer_parameters: [129761280, 20971520]
3 changes: 3 additions & 0 deletions dlio_benchmark/data_generator/generator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,8 @@ def get_generator(type):
elif type == FormatType.PNG:
from dlio_benchmark.data_generator.png_generator import PNGGenerator
return PNGGenerator()
elif type == FormatType.INDEXED_BINARY or type == FormatType.MMAP_INDEXED_BINARY:
from dlio_benchmark.data_generator.indexed_binary_generator import IndexedBinaryGenerator
return IndexedBinaryGenerator()
else:
raise Exception(str(ErrorCodes.EC1001))
2 changes: 1 addition & 1 deletion dlio_benchmark/data_generator/hdf5_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def generate(self):
"""
super().generate()
np.random.seed(10)
samples_per_iter=max(1, int(32*1024*1024/self._args.record_length))
samples_per_iter=max(1, int(self._args.generation_buffer_size/self._args.record_length))
record_labels = [0] * self.num_samples
for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)):
progress(i, self.total_files_to_generate, "Generating HDF5 Data")
Expand Down
97 changes: 97 additions & 0 deletions dlio_benchmark/data_generator/indexed_binary_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Copyright (c) 2022, UChicago Argonne, LLC
All Rights Reserved
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dlio_benchmark.common.enumerations import Compression
from dlio_benchmark.data_generator.data_generator import DataGenerator

import logging
import numpy as np

from dlio_benchmark.utils.utility import progress, utcnow
from dlio_profiler.logger import fn_interceptor as Profile
from shutil import copyfile
from dlio_benchmark.common.constants import MODULE_DATA_GENERATOR
import struct

dlp = Profile(MODULE_DATA_GENERATOR)

"""
Generator for creating data in NPZ format.
"""
class IndexedBinaryGenerator(DataGenerator):
def __init__(self):
super().__init__()

def index_file_path_off(self, prefix_path):
return prefix_path + '.off.idx'

def index_file_path_size(self, prefix_path):
return prefix_path + '.sz.idx'

@dlp.log
def generate(self):
"""
Generator for creating data in NPZ format of 3d dataset.
"""
super().generate()
np.random.seed(10)
GB=1073741824
for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)):
dim1, dim2 = self.get_dimension()
sample_size = dim1 * dim2
total_size = sample_size * self.num_samples
write_size = total_size
memory_size = self._args.generation_buffer_size
if total_size > memory_size:
write_size = memory_size - (memory_size % sample_size)
out_path_spec = self.storage.get_uri(self._file_list[i])
out_path_spec_off_idx = self.index_file_path_off(out_path_spec)
out_path_spec_sz_idx = self.index_file_path_size(out_path_spec)
progress(i + 1, self.total_files_to_generate, "Generating Indexed Binary Data")
prev_out_spec = out_path_spec
written_bytes = 0
data_file = open(out_path_spec, "wb")
off_file = open(out_path_spec_off_idx, "wb")
sz_file = open(out_path_spec_sz_idx, "wb")
records = np.random.randint(255, size=write_size, dtype=np.uint8)
while written_bytes < total_size:
data_to_write = write_size if written_bytes + write_size <= total_size else total_size - written_bytes
samples_to_write = data_to_write // sample_size

# Write data
myfmt = 'B' * data_to_write
binary_data = struct.pack(myfmt, *records[:data_to_write])
data_file.write(binary_data)

# Write offsets
myfmt = 'Q' * samples_to_write
offsets = range(0, data_to_write, sample_size)
offsets = offsets[:samples_to_write]
binary_offsets = struct.pack(myfmt, *offsets)
off_file.write(binary_offsets)

# Write sizes
myfmt = 'Q' * samples_to_write
sample_sizes = [sample_size] * samples_to_write
binary_sizes = struct.pack(myfmt, *sample_sizes)
sz_file.write(binary_sizes)

written_bytes = written_bytes + data_to_write
data_file.close()
off_file.close()
sz_file.close()
np.random.seed()
17 changes: 14 additions & 3 deletions dlio_benchmark/framework/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"""

from abc import ABC, abstractmethod

from dlio_benchmark.common.enumerations import DatasetType
from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory
from dlio_benchmark.storage.storage_factory import StorageFactory
from dlio_benchmark.utils.utility import utcnow

from time import sleep
Expand All @@ -40,11 +44,18 @@ def __init__(self):
self.args = ConfigArguments.get_instance()
self.output_folder = self.args.output_folder
self.checkpoint_folder = self.args.checkpoint_folder
pass


@abstractmethod
def init_loader(self, format_type, epoch_number, data_loader=None):
pass
def init_loader(self, format_type, epoch, data_loader=None):
self.reader_train = DataLoaderFactory.get_loader(data_loader, format_type,
dataset_type=DatasetType.TRAIN, epoch=epoch)
self.reader_valid = DataLoaderFactory.get_loader(data_loader, format_type,
dataset_type=DatasetType.VALID, epoch=epoch)
self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, self.args.framework)
checkpoint_storage = StorageFactory().get_storage(self.args.storage_type, self.checkpoint_folder,
self.args.framework)
checkpoint_storage.create_namespace(exist_ok=True)

@abstractmethod
def get_type(self):
Expand Down
Loading

0 comments on commit 0a6130a

Please sign in to comment.