diff --git a/dlio_benchmark/checkpointing/__init__.py b/dlio_benchmark/checkpointing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dlio_benchmark/checkpointing/base_checkpointing.py b/dlio_benchmark/checkpointing/base_checkpointing.py new file mode 100644 index 00000000..a473a026 --- /dev/null +++ b/dlio_benchmark/checkpointing/base_checkpointing.py @@ -0,0 +1,90 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import os +from abc import ABC, abstractmethod + +from dlio_benchmark.common.enumerations import CheckpointLocationType +from dlio_benchmark.storage.storage_factory import StorageFactory +from dlio_benchmark.utils.config import ConfigArguments +from dlio_benchmark.utils.utility import DLIOMPI + + +class BaseCheckpointing(ABC): + + def __init__(self, ext): + self.ext = ext + self.args = ConfigArguments.get_instance() + checkpoint_storage = StorageFactory().get_storage(self.args.storage_type, self.args.checkpoint_folder, + self.args.framework) + checkpoint_storage.create_namespace(exist_ok=True) + rank_to_checkpoint = self.args.my_rank + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == self.args.my_rank: + self.model_state = None + if self.args.model_size > 0: + self.model_state = {"a": self.get_tensor(self.args.model_size)} + self.optimization_state = None + if len(self.args.optimization_groups) > 0: + self.optimization_state = dict() + tensor_array_size = 0 + for index, state in enumerate(self.args.optimization_groups): + if state > 0: + self.optimization_state[str(index)] = {'a': self.get_tensor(state), + 'b': self.get_tensor(state)} + tensor_array_size += state + self.optimization_state["combined"] = self.get_tensor(tensor_array_size) + self.layer_state = None + if len(self.args.layer_parameters) > 0: + self.layer_state = dict() + for index, state in enumerate(self.args.layer_parameters): + if state > 0: + self.layer_state[str(index)] = self.get_tensor(state) + + @abstractmethod + def get_tensor(self, size): + return [] + + @abstractmethod + def save_state(self, suffix, state): + pass + + def get_name(self, suffix): + return os.path.join(self.args.checkpoint_folder, f"{suffix}.{self.ext}") + + @abstractmethod + def checkpoint(self, epoch, step_number): + rank_to_checkpoint = DLIOMPI.get_instance().rank() + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == DLIOMPI.get_instance().rank(): + my_rank = DLIOMPI.get_instance().rank() + if self.model_state: + self.save_state(suffix=f"model-{epoch}-{step_number}-{my_rank}", state=self.model_state) + if self.optimization_state: + self.save_state(suffix=f"optimizer-{epoch}-{step_number}-{my_rank}", state=self.optimization_state) + if rank_to_checkpoint % self.args.pipeline_parallelism == 0: + if self.layer_state and self.args.num_layers > 0: + total_layers = self.args.num_layers + if self.args.tensor_parallelism > 1: + total_layers = total_layers + self.args.tensor_parallelism + for layer in range(total_layers): + self.save_state(suffix=f"layer-{layer}-{epoch}-{step_number}-{my_rank}", state=self.layer_state) + + @abstractmethod + def finalize(self): + pass \ No newline at end of file diff --git a/dlio_benchmark/checkpointing/checkpointing_factory.py b/dlio_benchmark/checkpointing/checkpointing_factory.py new file mode 100644 index 00000000..b91c512f --- /dev/null +++ b/dlio_benchmark/checkpointing/checkpointing_factory.py @@ -0,0 +1,43 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import logging + +from dlio_benchmark.common.enumerations import CheckpointMechanismType +from dlio_benchmark.common.error_code import ErrorCodes +from dlio_benchmark.utils.config import ConfigArguments +from dlio_benchmark.utils.utility import utcnow + + +class CheckpointingFactory(object): + def __init__(self): + pass + + @staticmethod + def get_mechanism(checkpoint_mechanism_type): + _args = ConfigArguments.get_instance() + if _args.checkpoint_mechanism_class is not None: + logging.info(f"{utcnow()} Running DLIO with custom checkpointing mechanism " + f"class {_args.checkpoint_mechanism_class.__name__}") + return _args.checkpoint_mechanism_class.get_instance() + elif checkpoint_mechanism_type == CheckpointMechanismType.TF_SAVE: + from dlio_benchmark.checkpointing.tf_checkpointing import TFCheckpointing + return TFCheckpointing.get_instance() + elif checkpoint_mechanism_type == CheckpointMechanismType.PT_SAVE: + from dlio_benchmark.checkpointing.pytorch_checkpointing import PyTorchCheckpointing + return PyTorchCheckpointing.get_instance() + else: + raise Exception(str(ErrorCodes.EC1005)) \ No newline at end of file diff --git a/dlio_benchmark/checkpointing/pytorch_checkpointing.py b/dlio_benchmark/checkpointing/pytorch_checkpointing.py new file mode 100644 index 00000000..3a933de4 --- /dev/null +++ b/dlio_benchmark/checkpointing/pytorch_checkpointing.py @@ -0,0 +1,61 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import os +import torch + +from dlio_benchmark.checkpointing.base_checkpointing import BaseCheckpointing +from dlio_profiler.logger import fn_interceptor as Profile + +from dlio_benchmark.common.constants import MODULE_CHECKPOINT +from dlio_benchmark.common.enumerations import CheckpointLocationType +from dlio_benchmark.utils.utility import DLIOMPI + +dlp = Profile(MODULE_CHECKPOINT) + + +class PyTorchCheckpointing(BaseCheckpointing): + __instance = None + + @staticmethod + def get_instance(): + """ Static access method. """ + if PyTorchCheckpointing.__instance is None: + PyTorchCheckpointing.__instance = PyTorchCheckpointing() + return PyTorchCheckpointing.__instance + + @dlp.log_init + def __init__(self): + super().__init__("pt") + + @dlp.log + def get_tensor(self, size): + return torch.randint(high=1, size=(size,), dtype=torch.int8) + + @dlp.log + def save_state(self, suffix, state): + name = self.get_name(suffix) + with open(name, "wb") as f: + torch.save(state, f) + + @dlp.log + def checkpoint(self, epoch, step_number): + super().checkpoint(epoch, step_number) + + @dlp.log + def finalize(self): + super().finalize() + diff --git a/dlio_benchmark/checkpointing/tf_checkpointing.py b/dlio_benchmark/checkpointing/tf_checkpointing.py new file mode 100644 index 00000000..94e43ee0 --- /dev/null +++ b/dlio_benchmark/checkpointing/tf_checkpointing.py @@ -0,0 +1,61 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import os + +from dlio_benchmark.checkpointing.base_checkpointing import BaseCheckpointing +from dlio_profiler.logger import fn_interceptor as Profile +import tensorflow as tf + +from dlio_benchmark.common.constants import MODULE_CHECKPOINT +from dlio_benchmark.common.enumerations import CheckpointLocationType +from dlio_benchmark.utils.utility import DLIOMPI + +dlp = Profile(MODULE_CHECKPOINT) + + +class TFCheckpointing(BaseCheckpointing): + __instance = None + + @staticmethod + def get_instance(): + """ Static access method. """ + if TFCheckpointing.__instance is None: + TFCheckpointing.__instance = TFCheckpointing() + return TFCheckpointing.__instance + + @dlp.log_init + def __init__(self): + super().__init__("pb") + + @dlp.log + def get_tensor(self, size): + return tf.random.uniform((int(size / 4),), maxval=100, dtype=tf.dtypes.int32) + + @dlp.log + def save_state(self, suffix, state): + name = self.get_name(suffix) + checkpoint = tf.train.Checkpoint() + checkpoint.mapped = state + checkpoint.save(name) + + @dlp.log + def checkpoint(self, epoch, step_number): + super().checkpoint(epoch, step_number) + + @dlp.log + def finalize(self): + super().finalize() \ No newline at end of file diff --git a/dlio_benchmark/common/constants.py b/dlio_benchmark/common/constants.py index 23305cb2..a84da164 100644 --- a/dlio_benchmark/common/constants.py +++ b/dlio_benchmark/common/constants.py @@ -19,6 +19,7 @@ ''' MODULE_DATA_LOADER = "data_loader" MODULE_AI_FRAMEWORK = "ai_framework" +MODULE_CHECKPOINT = "checkpoint" MODULE_DATA_READER = "reader" MODULE_DATA_GENERATOR = "generator" MODULE_STORAGE = "storage" diff --git a/dlio_benchmark/common/enumerations.py b/dlio_benchmark/common/enumerations.py index 91e911f8..75b17599 100644 --- a/dlio_benchmark/common/enumerations.py +++ b/dlio_benchmark/common/enumerations.py @@ -17,9 +17,22 @@ from enum import Enum + +class CheckpointMechanismType(Enum): + """ + Different Checkpoint mechanisms. + """ + NONE = 'none' + CUSTOM = 'custom' + TF_SAVE = 'tf_save' + PT_SAVE = 'pt_save' + + def __str__(self): + return self.value + class CheckpointLocationType(Enum): """ - Different types of underlying storage + Different types of Checkpointing Locations """ RANK_ZERO = 'rank_zero' ALL_RANKS = 'all_ranks' diff --git a/dlio_benchmark/common/error_code.py b/dlio_benchmark/common/error_code.py index 2f4ccdae..fa663e37 100644 --- a/dlio_benchmark/common/error_code.py +++ b/dlio_benchmark/common/error_code.py @@ -34,4 +34,5 @@ class ErrorCodes: EC1001 = {1001, "ERROR: Incorrect Format Type"} EC1002 = {1002, "ERROR: Invalid Parameter Combination"} EC1003 = {1003, "ERROR: Invalid Data Loader"} - EC1004 = {1004, "ERROR: Not supported"} \ No newline at end of file + EC1004 = {1004, "ERROR: Not supported"} + EC1005 = {1005, "ERROR: Invalid Checkpointing Mechanism"} \ No newline at end of file diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index a9d6d3dc..ef56b0af 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -43,7 +43,6 @@ class Framework(ABC): def __init__(self): self.args = ConfigArguments.get_instance() self.output_folder = self.args.output_folder - self.checkpoint_folder = self.args.checkpoint_folder @abstractmethod @@ -53,9 +52,6 @@ def init_loader(self, format_type, epoch, data_loader=None): self.reader_valid = DataLoaderFactory.get_loader(data_loader, format_type, dataset_type=DatasetType.VALID, epoch=epoch) self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, self.args.framework) - checkpoint_storage = StorageFactory().get_storage(self.args.storage_type, self.checkpoint_folder, - self.args.framework) - checkpoint_storage.create_namespace(exist_ok=True) @abstractmethod def get_type(self): @@ -73,9 +69,6 @@ def stop_framework_profiler(self): def trace_object(self, string, step, r): pass - def checkpoint(self, epoch, step_number): - pass - def model(epoch, epoch_number, step, computation_time): sleep(computation_time) diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index ed0cbf5b..e1cbf978 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -29,7 +29,7 @@ from dlio_benchmark.profiler.profiler_factory import ProfilerFactory from dlio_benchmark.storage.storage_factory import StorageFactory from dlio_benchmark.common.enumerations import FrameworkType, Profiler, FormatType, DatasetType, MetadataType, \ - DataLoaderType, CheckpointLocationType + DataLoaderType import tensorflow as tf from tensorflow.python.framework import errors @@ -53,32 +53,6 @@ def __init__(self, profiling): else: self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD) self.reader_handler = None - self.model_state = None - rank_to_checkpoint = self.args.my_rank - if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: - rank_to_checkpoint = 0 - if rank_to_checkpoint == self.args.my_rank: - if self.args.model_size > 0: - self.model_state = {"a": self._get_tensor(self.args.model_size)} - self.optimization_state = None - if len(self.args.optimization_groups) > 0: - self.optimization_state = dict() - tensor_array_size = 0 - for index, state in enumerate(self.args.optimization_groups): - if state > 0: - self.optimization_state[str(index)] = {'a': self._get_tensor(state), - 'b': self._get_tensor(state)} - tensor_array_size += state - self.optimization_state["combined"] = self._get_tensor(tensor_array_size) - self.layer_state = None - if len(self.args.layer_parameters) > 0: - self.layer_state = dict() - for index, state in enumerate(self.args.layer_parameters): - if state > 0: - self.layer_state[str(index)] = self._get_tensor(state) - - def _get_tensor(self, size): - return tf.random.uniform((int(size / 4),), maxval=100, dtype=tf.dtypes.int32) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): @@ -111,34 +85,6 @@ def stop_framework_profiler(self): def trace_object(self, string, step, r): pass # tf.profiler.experimental.Trace(string, step_num=step, _r=r) - @dlp.log - def checkpoint(self, epoch, step_number): - """ - Performs Checkpointing for a specific step number. It writes different file of different sizes. - """ - my_rank = DLIOMPI.get_instance().rank() - rank_to_checkpoint = my_rank - if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: - rank_to_checkpoint = 0 - if rank_to_checkpoint == my_rank: - if self.model_state: - fname = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}-{my_rank}.tf") - checkpoint = tf.train.Checkpoint() - checkpoint.mapped = self.model_state - checkpoint.save(fname) - if self.optimization_state: - fname = os.path.join(self.checkpoint_folder, f"optimizer-{epoch}-{step_number}-{my_rank}.tf") - checkpoint = tf.train.Checkpoint() - checkpoint.mapped = self.optimization_state - checkpoint.save(fname) - - if self.layer_state and self.args.num_layers > 0: - for layer in range(self.args.num_layers): - fname = os.path.join(self.checkpoint_folder, f"layer-{layer}-{epoch}-{step_number}-{my_rank}.tf") - checkpoint = tf.train.Checkpoint() - checkpoint.mapped = self.layer_state - checkpoint.save(fname) - @dlp.log def compute(self, x, epoch_number, step, computation_time): sleep(computation_time) diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index 84a6ac90..7a05fcae 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -16,7 +16,7 @@ """ from dlio_benchmark.common.error_code import ErrorCodes -from dlio_benchmark.common.enumerations import FormatType, FrameworkType, DatasetType, DataLoaderType, CheckpointLocationType +from dlio_benchmark.common.enumerations import FormatType, FrameworkType, DatasetType, DataLoaderType from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory from dlio_benchmark.framework.framework import Framework, DummyTraceObject from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK @@ -61,31 +61,6 @@ def __init__(self, profiling): super().__init__() self.profiling = profiling self.reader_handler = None - rank_to_checkpoint = self.args.my_rank - if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: - rank_to_checkpoint = 0 - if rank_to_checkpoint == self.args.my_rank: - self.model_state = None - if self.args.model_size > 0: - self.model_state = {"a": self._get_tensor(self.args.model_size)} - self.optimization_state = None - if len(self.args.optimization_groups) > 0: - self.optimization_state = dict() - tensor_array_size = 0 - for index, state in enumerate(self.args.optimization_groups): - if state > 0: - self.optimization_state[str(index)] = {'a': self._get_tensor(state), 'b': self._get_tensor(state)} - tensor_array_size += state - self.optimization_state["combined"] = self._get_tensor(tensor_array_size) - self.layer_state = None - if len(self.args.layer_parameters) > 0: - self.layer_state = dict() - for index, state in enumerate(self.args.layer_parameters): - if state > 0: - self.layer_state[str(index)] = self._get_tensor(state) - - def _get_tensor(self, size): - return torch.randint(high=1, size=(size,), dtype=torch.int8) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): @@ -116,29 +91,6 @@ def stop_framework_profiler(self): def trace_object(self, string, step, r): return DummyTraceObject(string, step, r) - @dlp.log - def checkpoint(self, epoch, step_number): - - rank_to_checkpoint = DLIOMPI.get_instance().rank() - if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: - rank_to_checkpoint = 0 - if rank_to_checkpoint == DLIOMPI.get_instance().rank(): - my_rank = DLIOMPI.get_instance().rank() - if self.model_state: - fname = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}-{my_rank}.pt") - with open(fname, "wb") as f: - torch.save(self.model_state, f) - if self.optimization_state: - fname = os.path.join(self.checkpoint_folder, f"optimizer-{epoch}-{step_number}-{my_rank}.pt") - with open(fname, "wb") as f: - torch.save(self.optimization_state, f) - - if self.layer_state and self.args.num_layers > 0: - for layer in range(self.args.num_layers): - fname = os.path.join(self.checkpoint_folder, f"layer-{layer}-{epoch}-{step_number}-{my_rank}.pt") - with open(fname, "wb") as f: - torch.save(self.layer_state, f) - @dlp.log def compute(self, x, epoch_number, step, computation_time): torch_sleep(computation_time) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 7c886059..b82b5f85 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -26,6 +26,7 @@ # Reduce TF and CUDA logging from numpy import random +from dlio_benchmark.checkpointing.checkpointing_factory import CheckpointingFactory from dlio_benchmark.common.constants import MODULE_DLIO_BENCHMARK os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' @@ -212,6 +213,7 @@ def initialize(self): f"Number of files for evaluation in {os.path.join(self.args.data_folder, f'{DatasetType.VALID}')} ({len(file_list_eval)}) is more than requested ({self.num_files_eval}). A subset of files will be used ") file_list_eval = file_list_eval[:self.num_files_eval] self.args.derive_configurations(file_list_train, file_list_eval) + self.checkpointing_mechanism = CheckpointingFactory().get_mechanism(self.args.checkpoint_mechanism) self.args.validate() self.comm.barrier() @@ -278,7 +280,7 @@ def _train(self, epoch): self.steps_between_checkpoints >= 0) and overall_step == self.next_checkpoint_step: self.stats.end_block(epoch, block, block_step) self.stats.start_ckpt(epoch, block, overall_step) - self.framework.checkpoint(epoch, overall_step) + self.checkpointing_mechanism.checkpoint(epoch, overall_step) self.stats.end_ckpt(epoch, block) block += 1 # Reset the number of steps after every checkpoint to mark the start of a new block @@ -299,7 +301,7 @@ def _train(self, epoch): if self.do_checkpoint and (self.steps_between_checkpoints < 0) and (epoch == self.next_checkpoint_epoch): self.stats.end_block(epoch, block, block_step) self.stats.start_ckpt(epoch, block, overall_step) - self.framework.checkpoint(epoch, overall_step) + self.checkpointing_mechanism.checkpoint(epoch, overall_step) self.stats.end_ckpt(epoch, block) self.next_checkpoint_epoch += self.epochs_between_checkpoints return overall_step @@ -356,6 +358,7 @@ def finalize(self): It finalizes the dataset once training is completed. """ self.comm.barrier() + self.checkpointing_mechanism.finalize() if not self.generate_only: if self.do_profiling: self.profiler.stop() diff --git a/dlio_benchmark/plugins/experimental/configs/workload/pt_custom_checkpoint.yaml b/dlio_benchmark/plugins/experimental/configs/workload/pt_custom_checkpoint.yaml new file mode 100644 index 00000000..b9c95eff --- /dev/null +++ b/dlio_benchmark/plugins/experimental/configs/workload/pt_custom_checkpoint.yaml @@ -0,0 +1,33 @@ +model: pt_custom_checkpoint + +framework: pytorch + +workflow: + generate_data: True + train: True + checkpoint: True + +dataset: + data_folder: data/unet3d/ + format: npz + num_files_train: 16 + num_samples_per_file: 1 + record_length: 4096 + +reader: + data_loader: pytorch + batch_size: 1 + read_threads: 1 + file_shuffle: seed + sample_shuffle: seed + +train: + epochs: 5 + computation_time: 1.3604 + +checkpoint: + checkpoint_folder: checkpoints/unet3d + checkpoint_after_epoch: 1 + epochs_between_checkpoints: 1 + model_size: 4096 + checkpoint_mechanism_classname: dlio_benchmark.plugins.experimental.src.checkpoint.pytorch_checkpointing.CustomPyTorchCheckpointing diff --git a/dlio_benchmark/plugins/experimental/src/checkpoint/__init__.py b/dlio_benchmark/plugins/experimental/src/checkpoint/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dlio_benchmark/plugins/experimental/src/checkpoint/pytorch_checkpointing.py b/dlio_benchmark/plugins/experimental/src/checkpoint/pytorch_checkpointing.py new file mode 100644 index 00000000..68b4fbaf --- /dev/null +++ b/dlio_benchmark/plugins/experimental/src/checkpoint/pytorch_checkpointing.py @@ -0,0 +1,57 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import os +import torch + +from dlio_benchmark.checkpointing.base_checkpointing import BaseCheckpointing +from dlio_profiler.logger import fn_interceptor as Profile + +from dlio_benchmark.common.constants import MODULE_CHECKPOINT +from dlio_benchmark.common.enumerations import CheckpointLocationType +from dlio_benchmark.utils.utility import DLIOMPI + +dlp = Profile(MODULE_CHECKPOINT) + + +class CustomPyTorchCheckpointing(BaseCheckpointing): + __instance = None + + @staticmethod + def get_instance(): + """ Static access method. """ + if CustomPyTorchCheckpointing.__instance is None: + CustomPyTorchCheckpointing.__instance = CustomPyTorchCheckpointing() + return CustomPyTorchCheckpointing.__instance + + @dlp.log_init + def __init__(self): + super().__init__("pt") + + @dlp.log + def get_tensor(self, size): + return torch.randint(high=1, size=(size,), dtype=torch.int8) + + @dlp.log + def save_state(self, suffix, state): + name = self.get_name(suffix) + with open(name, "wb") as f: + torch.save(state, f) + + @dlp.log + def checkpoint(self, epoch, step_number): + super().checkpoint(epoch, step_number) + diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 5a3368c3..c1837bd9 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -27,7 +27,7 @@ from dlio_benchmark.common.constants import MODULE_CONFIG from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ FrameworkType, \ - DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType + DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType, CheckpointMechanismType from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow from dataclasses import dataclass import math @@ -97,21 +97,25 @@ class ConfigArguments: do_eval: bool = False batch_size_eval: int = 1 num_files_eval: int = 0 - generation_buffer_size: int = 2 * 1073741824 # 2 GB + generation_buffer_size: int = 2 * 1073741824 # 2 GB eval_time: float = 0.0 eval_time_stdev: float = 0.0 eval_after_epoch: int = 1 epochs_between_evals: int = 1 checkpoint_type: CheckpointLocationType = CheckpointLocationType.RANK_ZERO + checkpoint_mechanism: CheckpointMechanismType = CheckpointMechanismType.NONE model_size: int = 10240 - optimization_groups: ClassVar[List[int]] = [] + optimization_groups: ClassVar[List[int]] = [] num_layers: int = 1 layer_parameters: ClassVar[List[int]] = [17371, 24740228] + tensor_parallelism: int = 1 + pipeline_parallelism: int = 1 data_loader: DataLoaderType = DataLoaderType.TENSORFLOW.value num_subfolders_train: int = 0 num_subfolders_eval: int = 0 iostat_devices: ClassVar[List[str]] = [] data_loader_classname = None + checkpoint_mechanism_classname = None data_loader_sampler: DataLoaderSampler = None reader_classname: str = None multiprocessing_context: str = "fork" @@ -133,6 +137,7 @@ class ConfigArguments: global_index_map = None data_loader_class = None reader_class = None + checkpoint_mechanism_class = None def __init__(self): """ Virtually private constructor. """ @@ -145,7 +150,7 @@ def __init__(self): def __setstate__(self, state): self.__dict__.update(state) - DLIOMPI.reset() # in 'fork' case, clear parent's DLIOMPI + DLIOMPI.reset() # in 'fork' case, clear parent's DLIOMPI DLIOMPI.get_instance().set_parent_values(self.my_rank, self.comm_size) ConfigArguments.__instance = self @@ -222,6 +227,15 @@ def validate(self): logging.warning( f"Running DLIO with {self.read_threads} threads for I/O but core available {cores_available} " f"are insufficient and can lead to lower performance.") + if self.num_layers % self.pipeline_parallelism != 0: + raise Exception( + f"Expected checkpoint.num_layers {self.num_layers} should be multiple of " + f"checkpoint.pipeline_parallelism {self.pipeline_parallelism}.") + if self.num_layers % self.tensor_parallelism != 0: + raise Exception( + f"Expected checkpoint.num_layers {self.num_layers} should be multiple of " + f"checkpoint.tensor_parallelism {self.tensor_parallelism}.") + @staticmethod def reset(): ConfigArguments.__instance = None @@ -229,11 +243,16 @@ def reset(): @dlp.log def derive_configurations(self, file_list_train=None, file_list_eval=None): self.dimension = int(math.sqrt(self.record_length)) - self.dimension_stdev = self.record_length_stdev/2.0/math.sqrt(self.record_length) + self.dimension_stdev = self.record_length_stdev / 2.0 / math.sqrt(self.record_length) self.max_dimension = self.dimension - if (self.record_length_resize>0): - self.max_dimension = int(math.sqrt(self.record_length_resize)) - if (file_list_train !=None and file_list_eval !=None): + if self.checkpoint_mechanism == CheckpointMechanismType.NONE: + if self.framework == FrameworkType.TENSORFLOW: + self.checkpoint_mechanism = CheckpointMechanismType.TF_SAVE + elif self.framework == FrameworkType.PYTORCH: + self.checkpoint_mechanism = CheckpointMechanismType.PT_SAVE + if (self.record_length_resize > 0): + self.max_dimension = int(math.sqrt(self.record_length_resize)) + if (file_list_train != None and file_list_eval != None): self.resized_image = np.random.randint(255, size=(self.max_dimension, self.max_dimension), dtype=np.uint8) self.file_list_train = file_list_train self.file_list_eval = file_list_eval @@ -260,6 +279,15 @@ def derive_configurations(self, file_list_train=None, file_list_eval=None): logging.info(f"Discovered custom data loader {class_name}") self.data_loader_class = obj break + if self.checkpoint_mechanism_classname is not None: + from dlio_benchmark.checkpointing.base_checkpointing import BaseCheckpointing + classname = self.checkpoint_mechanism_classname.split(".")[-1] + module = importlib.import_module(".".join(self.checkpoint_mechanism_classname.split(".")[:-1])) + for class_name, obj in inspect.getmembers(module): + if class_name == classname and issubclass(obj, BaseCheckpointing): + logging.info(f"Discovered custom checkpointing mechanism {class_name}") + self.checkpoint_mechanism_class = obj + break if self.reader_classname is not None: from dlio_benchmark.reader.reader_handler import FormatReader classname = self.reader_classname.split(".")[-1] @@ -296,9 +324,10 @@ def build_sample_map_iter(self, file_list, total_samples, epoch_number): process_thread_file_map[rank][thread_index] = [] selected_samples = 0 while selected_samples < self.samples_per_thread: - process_thread_file_map[rank][thread_index].append((sample_global_list[sample_index], + process_thread_file_map[rank][thread_index].append((sample_global_list[sample_index], os.path.abspath(file_list[file_index]), - sample_global_list[sample_index] % self.num_samples_per_file)) + sample_global_list[ + sample_index] % self.num_samples_per_file)) sample_index += 1 selected_samples += 1 if sample_index >= self.num_samples_per_file: @@ -340,6 +369,8 @@ def reconfigure(self, epoch_number, dataset_type): self.global_index_map = self.get_global_map_index(self.file_list_train, self.total_samples_train) else: self.global_index_map = self.get_global_map_index(self.file_list_eval, self.total_samples_eval) + + def LoadConfig(args, config): ''' Override the args by a system config (typically loaded from a YAML file) @@ -358,7 +389,7 @@ def LoadConfig(args, config): args.storage_type = StorageType(config['storage']['storage_type']) if 'storage_root' in config['storage']: args.storage_root = config['storage']['storage_root'] - + # dataset related settings if 'dataset' in config: if 'record_length' in config['dataset']: @@ -429,7 +460,7 @@ def LoadConfig(args, config): if 'file_shuffle' in reader: args.file_shuffle = reader['file_shuffle'] if 'file_access' in reader: - args.file_access = FileAccess(reader['file_access']) + args.file_access = FileAccess(reader['file_access']) if 'shuffle_size' in reader: args.shuffle_size = reader['shuffle_size'] if 'sample_shuffle' in reader: @@ -440,7 +471,7 @@ def LoadConfig(args, config): args.transfer_size = reader['transfer_size'] if 'preprocess_time' in reader: args.preprocess_time = reader['preprocess_time'] - if 'preprocess_time_stdev' in reader: + if 'preprocess_time_stdev' in reader: args.preprocess_time_stdev = reader['preprocess_time_stdev'] # training relevant setting @@ -480,6 +511,8 @@ def LoadConfig(args, config): args.steps_between_checkpoints = config['checkpoint']['steps_between_checkpoints'] if 'type' in config['checkpoint']: args.checkpoint_type = CheckpointLocationType(config['checkpoint']['type']) + if 'checkpoint_mechanism_classname' in config['checkpoint']: + args.checkpoint_mechanism_classname = config['checkpoint']['checkpoint_mechanism_classname'] if 'model_size' in config['checkpoint']: args.model_size = config['checkpoint']['model_size'] if 'optimization_groups' in config['checkpoint']: @@ -488,6 +521,10 @@ def LoadConfig(args, config): args.num_layers = config['checkpoint']['num_layers'] if 'layer_parameters' in config['checkpoint']: args.layer_parameters = config['checkpoint']['layer_parameters'] + if 'tensor_parallelism' in config['checkpoint']: + args.tensor_parallelism = config['checkpoint']['tensor_parallelism'] + if 'pipeline_parallelism' in config['checkpoint']: + args.pipeline_parallelism = config['checkpoint']['pipeline_parallelism'] if 'output' in config: if 'folder' in config['output']: args.output_folder = config['output']['folder'] diff --git a/docs/source/config.rst b/docs/source/config.rst index 3e2aabf3..21bbb6f1 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -313,7 +313,12 @@ checkpoint * - type - rank_zero - Which rank performs this checkpoint. All ranks (all_ranks) or Rank 0 (rank_zero). - + * - tensor_parallelism + - 1 + - Tensor parallelism for model. Used to determine the number of layer model files. + * - pipeline_parallelism + - 1 + - Pipeline parallelism for model. .. note:: diff --git a/docs/source/custom_checkpointing_mechanism.rst b/docs/source/custom_checkpointing_mechanism.rst new file mode 100644 index 00000000..70e58ddd --- /dev/null +++ b/docs/source/custom_checkpointing_mechanism.rst @@ -0,0 +1,78 @@ +Creating a Checkpointing Plugin +============================== + +Within DLIO Benchmark we can define custom checkpointing implementations. +This feature allows us to extend DLIO Benchmark with new checkpointing implementation easily without changing existing code. +To achieve this developers have to take the following main steps. + +1. Write their custom checkpointing. +2. Define workflow configuration. +3. Run the workload with custom checkpointing. + +Write their custom checkpointing. +-------------------------------- + +In this section, we will describe how to write the custom checkpointing. +To write a checkpointing you need to implement `BaseCheckpointing` Class. +This checkpointing needs to added `/dlio_benchmark/plugins/experimental/src/checkpointing`. +A complete examples can be seen at `/dlio_benchmark/checkpointing/` + +- For PyTorch: pytorch_checkpointing.py +- For TensorFlow: tf_checkpointing.py + +Say we store the custom checkpointing for pytorch into `/dlio_benchmark/plugins/experimental/src/checkpoint/pytorch_checkpointing.py` + +.. code-block:: python + + class CustomPyTorchCheckpointing(BaseCheckpointing): + __instance = None + + @staticmethod + def get_instance(): + """ Static access method. """ + if CustomPyTorchCheckpointing.__instance is None: + CustomPyTorchCheckpointing.__instance = CustomPyTorchCheckpointing() + return CustomPyTorchCheckpointing.__instance + + @dlp.log_init + def __init__(self): + super().__init__("pt") + + @dlp.log + def get_tensor(self, size): + return torch.randint(high=1, size=(size,), dtype=torch.int8) + + @dlp.log + def save_state(self, suffix, state): + name = self.get_name(suffix) + with open(name, "wb") as f: + torch.save(state, f) + + @dlp.log + def checkpoint(self, epoch, step_number): + super().checkpoint(epoch, step_number) + +Define workflow configuration. +------------------------------ + +In this section, we will detail how to create a custom workflow configuration for DLIO Benchmark. +The workload configuration for plugins exists in `/dlio_benchmark/plugins/experimental`. +You can copy an existing configuration from `/dlio_benchmark/configs/workload` and modify it for your custom checkpointing. +Main changes to the workflow configuration are: + +.. code-block:: yaml + + # Rest remains as it is + reader: + checkpoint_mechanism_classname: dlio_benchmark.plugins.experimental.src.checkpoint.pytorch_checkpointing.CustomPyTorchCheckpointing + + +In the above configuration, `checkpoint_mechanism_classname` should point to FQN of the class (as in the PYTHONPATH). + + +Run the workload with custom checkpointing. +------------------------------------------ + +To run the custom checkpointing, we have to define the plugin folder as the custom config folder. +This is described in the :ref:`run` page. +We need to pass path `plugins/experimental/configs` as the path. \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index ecf42573..a255b76d 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -35,6 +35,7 @@ GitHub repo: https://github.com/argonne-lcf/dlio_benchmark. custom_data_loader custom_reader + custom_checkpointing_mechanism .. toctree:: :maxdepth: 2