From 8c77efbbe15c150c8ca707b2c1184a324bee340a Mon Sep 17 00:00:00 2001 From: Ella Charlaix Date: Fri, 3 Nov 2023 14:38:36 +0100 Subject: [PATCH] trainer --- optimum/intel/neural_compressor/trainer.py | 3 +- optimum/intel/openvino/trainer.py | 294 ++++++++++++++------- 2 files changed, 205 insertions(+), 92 deletions(-) diff --git a/optimum/intel/neural_compressor/trainer.py b/optimum/intel/neural_compressor/trainer.py index 1923076bb5..5ae4a1f72a 100644 --- a/optimum/intel/neural_compressor/trainer.py +++ b/optimum/intel/neural_compressor/trainer.py @@ -35,10 +35,9 @@ from transformers import Trainer from transformers.data.data_collator import DataCollator from transformers.debug_utils import DebugOption, DebugUnderflowOverflow -from transformers.file_utils import WEIGHTS_NAME # Integrations must be imported before ML frameworks: -from transformers.integrations import deepspeed_init, deepspeed_load_checkpoint, hp_params +from transformers.integrations import deepspeed_init, deepspeed_load_checkpoint, hp_params, is_deepspeed_available from transformers.modeling_utils import PreTrainedModel, get_parameter_dtype, unwrap_model from transformers.models.auto.modeling_auto import MODEL_FOR_CAUSAL_LM_MAPPING_NAMES from transformers.pytorch_utils import is_torch_less_than_1_11 diff --git a/optimum/intel/openvino/trainer.py b/optimum/intel/openvino/trainer.py index 3e64a34b09..c3f3b25b52 100644 --- a/optimum/intel/openvino/trainer.py +++ b/optimum/intel/openvino/trainer.py @@ -16,12 +16,12 @@ import io import math import os +import shutil import sys import time -from collections import defaultdict from itertools import chain from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Type, Union import openvino import openvino.runtime @@ -46,21 +46,23 @@ compress_quantize_weights_transformation, ) from openvino.runtime import Core, PartialShape, save_model +from packaging import version +from torch import nn from torch.onnx import export as onnx_export from torch.utils._pytree import tree_map -from torch.utils.data import DataLoader, Dataset, RandomSampler -from torch.utils.data.distributed import DistributedSampler -from tqdm.auto import tqdm +from torch.utils.data import Dataset, RandomSampler from transformers import Trainer from transformers.data.data_collator import DataCollator from transformers.debug_utils import DebugOption, DebugUnderflowOverflow -from transformers.integrations import deepspeed_init, deepspeed_load_checkpoint, hp_params + +# Integrations must be imported before ML frameworks: +from transformers.integrations import deepspeed_init, deepspeed_load_checkpoint, hp_params, is_deepspeed_available from transformers.modeling_utils import PreTrainedModel, unwrap_model from transformers.pytorch_utils import is_torch_less_than_1_11 from transformers.tokenization_utils_base import PreTrainedTokenizerBase from transformers.trainer import TRAINER_STATE_NAME, TRAINING_ARGS_NAME from transformers.trainer_callback import TrainerCallback, TrainerState -from transformers.trainer_pt_utils import IterableDatasetShard +from transformers.trainer_pt_utils import get_dataloader_sampler, get_model_param_count from transformers.trainer_utils import ( EvalPrediction, HPSearchBackend, @@ -68,18 +70,21 @@ has_length, speed_metrics, ) +from transformers.training_args import ParallelMode from transformers.utils import ( WEIGHTS_NAME, + is_accelerate_available, is_apex_available, is_sagemaker_mp_enabled, is_torch_tpu_available, logging, ) +from optimum.exporters import TasksManager from optimum.exporters.onnx import OnnxConfig from optimum.exporters.tasks import TasksManager -from ..utils.constant import _TASK_ALIASES +from ..utils.constant import _TASK_ALIASES, ONNX_WEIGHTS_NAME, TRAINING_ARGS_NAME from ..utils.import_utils import is_transformers_version from .configuration import OVConfig from .quantization import OVDataLoader @@ -93,6 +98,22 @@ ) +if is_accelerate_available(): + from accelerate import __version__ as accelerate_version + from accelerate import skip_first_batches + + if version.parse(accelerate_version) > version.parse("0.20.3"): + pass + DATA_SAMPLERS = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + from accelerate.data_loader import SeedableRandomSampler + + DATA_SAMPLERS += [SeedableRandomSampler] + + if is_deepspeed_available(): + pass + + if is_apex_available(): from apex import amp @@ -102,6 +123,10 @@ if is_torch_tpu_available(check_device=False): import torch_xla.core.xla_model as xm +if TYPE_CHECKING: + from optimum.exporters.onnx import OnnxConfig + + core = Core() logger = logging.get_logger(__name__) @@ -242,7 +267,9 @@ def _set_signature_columns_if_needed(self): def _inner_training_loop( self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None ): + self.accelerator.free_memory() self._train_batch_size = batch_size + logger.debug(f"Currently training with a batch size of: {self._train_batch_size}") # Data loader and number of training steps train_dataloader = self.get_train_dataloader() @@ -253,6 +280,7 @@ def _inner_training_loop( total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size len_dataloader = None + num_train_tokens = None if has_length(train_dataloader): len_dataloader = len(train_dataloader) num_update_steps_per_epoch = len_dataloader // args.gradient_accumulation_steps @@ -266,10 +294,16 @@ def _inner_training_loop( # May be slightly incorrect if the last batch in the training dataloader has a smaller size but it's # the best we can do. num_train_samples = args.max_steps * total_train_batch_size + if args.include_tokens_per_second: + num_train_tokens = ( + self.num_tokens(train_dataloader, args.max_steps) * args.gradient_accumulation_steps + ) else: max_steps = math.ceil(args.num_train_epochs * num_update_steps_per_epoch) num_train_epochs = math.ceil(args.num_train_epochs) num_train_samples = self.num_examples(train_dataloader) * args.num_train_epochs + if args.include_tokens_per_second: + num_train_tokens = self.num_tokens(train_dataloader) * args.num_train_epochs elif args.max_steps > 0: # Rely on max_steps when dataloader does not have a working size max_steps = args.max_steps # Setting a very large number of epochs so we go as many times as necessary over the iterator. @@ -277,6 +311,8 @@ def _inner_training_loop( num_update_steps_per_epoch = max_steps num_examples = total_train_batch_size * args.max_steps num_train_samples = args.max_steps * total_train_batch_size + if args.include_tokens_per_second: + num_train_tokens = self.num_tokens(train_dataloader, args.max_steps) * args.gradient_accumulation_steps else: raise ValueError( "args.max_steps must be set to a positive value if dataloader does not have a length, was" @@ -285,7 +321,7 @@ def _inner_training_loop( if DebugOption.UNDERFLOW_OVERFLOW in self.args.debug: if self.args.n_gpu > 1: - # torch.nn.DataParallel(model) replicates the model, creating new variables and module + # nn.DataParallel(model) replicates the model, creating new variables and module # references registered here no longer work on other gpus, breaking the module raise ValueError( "Currently --debug underflow_overflow is not supported under DP. Please use DDP" @@ -296,6 +332,11 @@ def _inner_training_loop( delay_optimizer_creation = is_sagemaker_mp_enabled() or self.fsdp is not None or self.is_fsdp_enabled + # We need to reset the scheduler, as its parameters may be different on subsequent calls + if self._created_lr_scheduler: + self.lr_scheduler = None + self._created_lr_scheduler = False + if self.is_deepspeed_enabled: self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps) @@ -343,6 +384,41 @@ def _inner_training_loop( model = self._wrap_model(self.model_wrapped) + # as the model is wrapped, don't use `accelerator.prepare` + # this is for unhandled cases such as + # FSDP-XLA, SageMaker MP/DP, DataParallel, IPEX + use_accelerator_prepare = True if model is self.model else False + + if delay_optimizer_creation: + if use_accelerator_prepare: + self.model = self.accelerator.prepare(self.model) + self.create_optimizer_and_scheduler(num_training_steps=max_steps) + + # prepare using `accelerator` prepare + if use_accelerator_prepare: + self.model.train() + if hasattr(self.lr_scheduler, "step"): + if self.use_apex: + model = self.accelerator.prepare(self.model) + else: + model, self.optimizer = self.accelerator.prepare(self.model, self.optimizer) + else: + # to handle cases wherein we pass "DummyScheduler" such as when it is specified in DeepSpeed config. + model, self.optimizer, self.lr_scheduler = self.accelerator.prepare( + self.model, self.optimizer, self.lr_scheduler + ) + + if self.is_fsdp_enabled: + self.model = self.model_wrapped = model + + # for the rest of this function `model` is the outside model, whether it was wrapped or not + if model is not self.model: + self.model_wrapped = model + + # backward compatibility + if self.is_deepspeed_enabled: + self.deepspeed = self.model_wrapped + # ckpt loading if resume_from_checkpoint is not None: if self.is_deepspeed_enabled: @@ -350,28 +426,25 @@ def _inner_training_loop( elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) - # for the rest of this function `model` is the outside model, whether it was wrapped or not - if model is not self.model: - self.model_wrapped = model - - if delay_optimizer_creation: - self.create_optimizer_and_scheduler(num_training_steps=max_steps) - # Check if saved optimizer or scheduler states exist self._load_optimizer_and_scheduler(resume_from_checkpoint) # important: at this point: # self.model is the Transformers Model - # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), + # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc. # Train! logger.info("***** Running training *****") - logger.info(f" Num examples = {num_examples}") - logger.info(f" Num Epochs = {num_train_epochs}") - logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}") - logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}") + logger.info(f" Num examples = {num_examples:,}") + logger.info(f" Num Epochs = {num_train_epochs:,}") + logger.info(f" Instantaneous batch size per device = {self.args.per_device_train_batch_size:,}") + if self.args.per_device_train_batch_size != self._train_batch_size: + logger.info(f" Training with DataParallel so batch size has been adjusted to: {self._train_batch_size:,}") + logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size:,}") logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}") - logger.info(f" Total optimization steps = {max_steps}") + logger.info(f" Total optimization steps = {max_steps:,}") + logger.info(f" Number of trainable parameters = {get_model_param_count(model, trainable_only=True):,}") self.state.epoch = 0 start_time = time.time() @@ -396,20 +469,19 @@ def _inner_training_loop( logger.info(f" Continuing training from global step {self.state.global_step}") if not args.ignore_data_skip: logger.info( - f" Will skip the first {epochs_trained} epochs then the first {steps_trained_in_current_epoch} " - "batches in the first epoch. If this takes a lot of time, you can add the `--ignore_data_skip` " - "flag to your launch command, but you will resume the training on data already seen by your model." + f" Will skip the first {epochs_trained} epochs then the first" + f" {steps_trained_in_current_epoch} batches in the first epoch." ) - if self.is_local_process_zero() and not args.disable_tqdm: - steps_trained_progress_bar = tqdm(total=steps_trained_in_current_epoch) - steps_trained_progress_bar.set_description("Skipping the first batches") # Update the references self.callback_handler.model = self.model self.callback_handler.optimizer = self.optimizer self.callback_handler.lr_scheduler = self.lr_scheduler self.callback_handler.train_dataloader = train_dataloader - self.state.trial_name = self.hp_name(trial) if self.hp_name is not None else None + if self.hp_name is not None and self._trial is not None: + # use self._trial because the SigOpt/Optuna hpo only call `_hp_search_setup(trial)` instead of passing trial + # parameter to Train when using DDP. + self.state.trial_name = self.hp_name(self._trial) if trial is not None: assignments = trial.assignments if self.hp_search_backend == HPSearchBackend.SIGOPT else trial self.state.trial_params = hp_params(assignments) @@ -422,8 +494,8 @@ def _inner_training_loop( self.state.is_local_process_zero = self.is_local_process_zero() self.state.is_world_process_zero = self.is_world_process_zero() + # tr_loss is a tensor to avoid synchronization of TPUs through .item() tr_loss = torch.tensor(0.0).to(args.device) - self.compression_metrics = defaultdict(lambda: torch.tensor(0.0).to(args.device)) # _total_loss_scalar is updated everytime .item() has to be called on tr_loss and stores the sum of all losses self._total_loss_scalar = 0.0 self._globalstep_last_logged = self.state.global_step @@ -434,31 +506,33 @@ def _inner_training_loop( # Skip the first epochs_trained epochs to get the random state of the dataloader at the right point. if not args.ignore_data_skip: for epoch in range(epochs_trained): - is_random_sampler = hasattr(train_dataloader, "sampler") and isinstance( - train_dataloader.sampler, RandomSampler - ) + sampler = get_dataloader_sampler(train_dataloader) + sampler_kinds = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + sampler_kinds.append(SeedableRandomSampler) + is_random_sampler = isinstance(sampler, tuple(sampler_kinds)) if is_torch_less_than_1_11 or not is_random_sampler: # We just need to begin an iteration to create the randomization of the sampler. - # That was before PyTorch 1.11 however... for _ in train_dataloader: break else: - # Otherwise we need to call the whole sampler cause there is some random operation added + # Otherwise we need to call the whooooole sampler cause there is some random operation added # AT THE VERY END! - _ = list(train_dataloader.sampler) + sampler = sampler if sampler is not None else [] + _ = list(sampler) + total_batched_samples = 0 for epoch in range(epochs_trained, num_train_epochs): - if isinstance(train_dataloader, DataLoader) and isinstance(train_dataloader.sampler, DistributedSampler): - train_dataloader.sampler.set_epoch(epoch) - elif hasattr(train_dataloader, "dataset") and isinstance(train_dataloader.dataset, IterableDatasetShard): - train_dataloader.dataset.set_epoch(epoch) + epoch_iterator = train_dataloader + if hasattr(epoch_iterator, "set_epoch"): + epoch_iterator.set_epoch(epoch) # Reset the past mems state at the beginning of each epoch if necessary. if args.past_index >= 0: self._past = None steps_in_epoch = ( - len(train_dataloader) + len(epoch_iterator) if len_dataloader is not None else args.max_steps * args.gradient_accumulation_steps ) @@ -474,8 +548,21 @@ def _inner_training_loop( if epoch == epochs_trained and resume_from_checkpoint is not None and steps_trained_in_current_epoch == 0: self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + steps_skipped = 0 + if steps_trained_in_current_epoch > 0: + epoch_iterator = skip_first_batches(epoch_iterator, steps_trained_in_current_epoch) + steps_skipped = steps_trained_in_current_epoch + steps_trained_in_current_epoch = 0 + rng_to_sync = True + step = -1 - for step, inputs in enumerate(train_dataloader): + for step, inputs in enumerate(epoch_iterator): + total_batched_samples += 1 + if rng_to_sync: + self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + # Skip past any already trained steps if resuming training if steps_trained_in_current_epoch > 0: steps_trained_in_current_epoch -= 1 @@ -494,17 +581,14 @@ def _inner_training_loop( # Must be called at the beginning of each training step to prepare the compression method self.compression_controller.scheduler.step() + with self.accelerator.accumulate(model): + tr_loss_step = self.training_step(model, inputs) + if ( - ((step + 1) % args.gradient_accumulation_steps != 0) - and args.local_rank != -1 - and args._no_sync_in_gradient_accumulation + args.logging_nan_inf_filter + and not is_torch_tpu_available() + and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)) ): - # Avoid unnecessary DDP synchronization since there will be no backward pass on this example. - with model.no_sync(): - tr_loss_step = self.training_step(model, inputs) - else: - tr_loss_step = self.training_step(model, inputs) - if args.logging_nan_inf_filter and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)): # if loss is nan or inf simply add the average of previous logged losses tr_loss += tr_loss / (1 + self.state.global_step - self._globalstep_last_logged) else: @@ -512,57 +596,52 @@ def _inner_training_loop( self.current_flos += float(self.floating_point_ops(inputs)) - # Optimizer step for deepspeed must be called on every step regardless of the value of gradient_accumulation_steps - if self.deepspeed: - self.deepspeed.step() + is_last_step_and_steps_less_than_grad_acc = ( + steps_in_epoch <= args.gradient_accumulation_steps and (step + 1) == steps_in_epoch + ) - if (step + 1) % args.gradient_accumulation_steps == 0 or ( + if ( + total_batched_samples % args.gradient_accumulation_steps == 0 + or # last step in epoch but step is always smaller than gradient_accumulation_steps - steps_in_epoch <= args.gradient_accumulation_steps - and (step + 1) == steps_in_epoch + is_last_step_and_steps_less_than_grad_acc ): + # the `or` condition of `is_last_step_and_steps_less_than_grad_acc` is not covered + # in accelerate. So, explicitly enable sync gradients to True in that case. + if is_last_step_and_steps_less_than_grad_acc or ( + version.parse(accelerate_version) <= version.parse("0.20.3") + ): + self.accelerator.gradient_state._set_sync_gradients(True) + # Gradient clipping - if args.max_grad_norm is not None and args.max_grad_norm > 0 and not self.deepspeed: + if args.max_grad_norm is not None and args.max_grad_norm > 0: # deepspeed does its own clipping - if self.do_grad_scaling: - # AMP: gradients need unscaling - self.scaler.unscale_(self.optimizer) - if is_sagemaker_mp_enabled() and args.fp16: self.optimizer.clip_master_grads(args.max_grad_norm) - elif hasattr(self.optimizer, "clip_grad_norm"): - # Some optimizers (like the sharded optimizer) have a specific way to do gradient clipping - self.optimizer.clip_grad_norm(args.max_grad_norm) - elif hasattr(model, "clip_grad_norm_"): - # Some models (like FullyShardedDDP) have a specific way to do gradient clipping - model.clip_grad_norm_(args.max_grad_norm) - else: + elif self.use_apex: # Revert to normal clipping otherwise, handling Apex or full precision - torch.nn.utils.clip_grad_norm_( - amp.master_params(self.optimizer) if self.use_apex else model.parameters(), + nn.utils.clip_grad_norm_( + amp.master_params(self.optimizer), + args.max_grad_norm, + ) + else: + self.accelerator.clip_grad_norm_( + model.parameters(), args.max_grad_norm, ) # Optimizer step - optimizer_was_run = True - if self.deepspeed: - pass # called outside the loop - elif self.do_grad_scaling: - scale_before = self.scaler.get_scale() - self.scaler.step(self.optimizer) - self.scaler.update() - scale_after = self.scaler.get_scale() - optimizer_was_run = scale_before <= scale_after - else: - self.optimizer.step() - - if optimizer_was_run and not self.deepspeed: - self.lr_scheduler.step() + self.optimizer.step() + optimizer_was_run = not self.accelerator.optimizer_step_was_skipped + if optimizer_was_run: + # Delay optimizer scheduling until metrics are generated + if not isinstance(self.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): + self.lr_scheduler.step() model.zero_grad() self.state.global_step += 1 - self.state.epoch = epoch + (step + 1) / steps_in_epoch + self.state.epoch = epoch + (step + 1 + steps_skipped) / steps_in_epoch self.control = self.callback_handler.on_step_end(args, self.state, self.control) self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) @@ -573,7 +652,7 @@ def _inner_training_loop( break if step < 0: logger.warning( - "There seems to be not a single sample in your train_dataloader, stopping training at step" + "There seems to be not a single sample in your epoch_iterator, stopping training at step" f" {self.state.global_step}! This is expected if you're using an IterableDataset and set" f" num_steps ({max_steps}) higher than the number of available samples." ) @@ -582,6 +661,15 @@ def _inner_training_loop( self.control = self.callback_handler.on_epoch_end(args, self.state, self.control) self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) + if DebugOption.TPU_METRICS_DEBUG in self.args.debug: + if is_torch_tpu_available(): + # tpu-comment: Logging debug metrics for PyTorch/XLA (compile, execute times, ops, etc.) + xm.master_print(met.metrics_report()) + else: + logger.warning( + "You enabled PyTorch/XLA debug metrics but you don't have a TPU " + "configured. Check your training configuration if this is unexpected." + ) if self.control.should_training_stop: break @@ -591,8 +679,10 @@ def _inner_training_loop( logger.info("\n\nTraining completed. Do not forget to share your model on huggingface.co/models =)\n\n") if args.load_best_model_at_end and self.state.best_model_checkpoint is not None: - # Wait for everyone to get here so we are sur the model has been saved by process 0. - if args.local_rank != -1: + # Wait for everyone to get here so we are sure the model has been saved by process 0. + if is_torch_tpu_available(): + xm.rendezvous("load_best_model_at_end") + elif args.parallel_mode == ParallelMode.DISTRIBUTED: dist.barrier() elif is_sagemaker_mp_enabled(): smp.barrier() @@ -603,7 +693,13 @@ def _inner_training_loop( self._total_loss_scalar += tr_loss.item() train_loss = self._total_loss_scalar / self.state.global_step - metrics = speed_metrics("train", start_time, num_samples=num_train_samples, num_steps=self.state.max_steps) + metrics = speed_metrics( + "train", + start_time, + num_samples=num_train_samples, + num_steps=self.state.max_steps, + num_tokens=num_train_tokens, + ) self.store_flos() metrics["total_flos"] = self.state.total_flos metrics["train_loss"] = train_loss @@ -614,8 +710,26 @@ def _inner_training_loop( self.log(metrics) + run_dir = self._get_output_dir(trial) + checkpoints_sorted = self._sorted_checkpoints(use_mtime=False, output_dir=run_dir) + + # Delete the last checkpoint when save_total_limit=1 if it's different from the best checkpoint and process allowed to save. + if self.args.should_save and self.state.best_model_checkpoint is not None and self.args.save_total_limit == 1: + for checkpoint in checkpoints_sorted: + if not os.path.samefile(checkpoint, self.state.best_model_checkpoint): + logger.info(f"Deleting older checkpoint [{checkpoint}] due to args.save_total_limit") + shutil.rmtree(checkpoint) + self.control = self.callback_handler.on_train_end(args, self.state, self.control) + # Wait for the checkpoint to be uploaded. + self._finish_current_push() + + # After training we make sure to retrieve back the original forward pass method + # for the embedding layer by removing the forward post hook. + if self.neftune_noise_alpha is not None: + self._deactivate_neftune(self.model) + return TrainOutput(self.state.global_step, train_loss, metrics) def compute_distillation_loss(self, inputs, student_outputs):