Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple GPUs using DataParallel #484

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/source/advanced_usage/trainingmodel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ GPU usage via
parameters.use_gpu = True

Afterwards, the entire training will be performed on the GPU - given that
a GPU is available.
a GPU is available. You can also set ``parameters.use_gpu`` to a specific number,
e.g., ``parameters.use_gpu = 4`` to use all available GPUs per compute node,
if multiple GPUs are attached (in this case, 4 GPUs are assumed).

In cooperation with `Nvidia <https://www.nvidia.com/de-de/deep-learning-ai/solutions/machine-learning/>`_,
advanced GPU performance optimizations have been implemented into MALA.
Expand Down
31 changes: 21 additions & 10 deletions mala/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
):
super(ParametersBase, self).__init__()
self._configuration = {
"gpu": False,
"gpu": 0,
"horovod": False,
"mpi": False,
"device": "cpu",
Expand Down Expand Up @@ -844,10 +844,7 @@ def use_graphs(self):
@use_graphs.setter
def use_graphs(self, value):
if value is True:
if (
self._configuration["gpu"] is False
or torch.version.cuda is None
):
if self._configuration["gpu"] == 0 or torch.version.cuda is None:
parallel_warn("No CUDA or GPU found, cannot use CUDA graphs.")
value = False
else:
Expand Down Expand Up @@ -1278,16 +1275,30 @@ def verbosity(self, value):

@property
def use_gpu(self):
"""Control whether or not a GPU is used (provided there is one)."""
"""
Control whether a GPU is used (provided there is one).

Can either be False/True or an integer. If set to False or 0,
no GPU will be used. For numbers higher than 0, the behavior differs
between training and inference. For training, the number of GPUs
set here will be used, i.e., if use_gpu=4, MALA will attempt to use
4 GPUs for training. For inference, only one GPU will be used per
MPI rank, such that the total number of GPUs used is determined by
the number of MPI ranks. Therefore, each number higher than 0 will be
treated the same in inference.
"""
return self._use_gpu

@use_gpu.setter
def use_gpu(self, value):
if value is False:
self._use_gpu = False
if value is False or value == 0:
self._use_gpu = 0
else:
if torch.cuda.is_available():
self._use_gpu = True
if value is True:
self._use_gpu = 1
else:
self._use_gpu = value
else:
parallel_warn(
"GPU requested, but no GPU found. MALA will "
Expand Down Expand Up @@ -1535,7 +1546,7 @@ def optuna_singlenode_setup(self, wait_time=0):
"""
# We first "trick" the parameters object to assume MPI and GPUs
# are used. That way we get the right device.
self.use_gpu = True
self.use_gpu = 1
self.use_mpi = True
device_temp = self.device
sleep(get_rank() * wait_time)
Expand Down
2 changes: 1 addition & 1 deletion mala/descriptors/descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def _setup_lammps(
lammps_dict["ngridy"] = ny
lammps_dict["ngridz"] = nz
lammps_dict["switch"] = self.parameters.bispectrum_switchflag
if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
# Tell Kokkos to use one GPU.
lmp_cmdargs.append("-k")
lmp_cmdargs.append("on")
Expand Down
6 changes: 3 additions & 3 deletions mala/network/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Network(nn.Module):
Parameters used to create this neural network.
"""

def __new__(cls, params: Parameters):
def __new__(cls, params: Parameters = None):
"""
Create a neural network instance.

Expand All @@ -40,7 +40,7 @@ def __new__(cls, params: Parameters):

Parameters
----------
params : mala.common.parametes.Parameters
params : mala.common.parameters.Parameters
Parameters used to create this neural network.
"""
model = None
Expand Down Expand Up @@ -454,7 +454,7 @@ def __init__(self, params):
self.params.layer_activations[0]
]()

if params.use_gpu:
if params.use_gpu > 0:
self.to("cuda")

def forward(self, x):
Expand Down
9 changes: 7 additions & 2 deletions mala/network/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ def save_run(
optimizer_file = run_name + ".optimizer.pth"

self.parameters_full.save(os.path.join(save_path, params_file))
self.network.save_network(os.path.join(save_path, model_file))
if hasattr(self.network, "module"):
self.network.module.save_network(
os.path.join(save_path, model_file)
)
else:
self.network.save_network(os.path.join(save_path, model_file))
self.data.input_data_scaler.save(os.path.join(save_path, iscaler_file))
self.data.output_data_scaler.save(
os.path.join(save_path, oscaler_file)
Expand Down Expand Up @@ -425,7 +430,7 @@ def __prepare_to_run(self):
"""
# See if we want to use horovod.
if self.parameters_full.use_horovod:
if self.parameters_full.use_gpu:
if self.parameters_full.use_gpu > 0:
# We cannot use "printout" here because this is supposed
# to happen on every rank.
if self.parameters_full.verbosity >= 2:
Expand Down
110 changes: 93 additions & 17 deletions mala/network/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def train_network(self):
if isinstance(self.data.training_data_sets[0], FastTensorDataset):
self.data.training_data_sets[0].shuffle()

if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
torch.cuda.synchronize(
self.parameters._configuration["device"]
)
Expand Down Expand Up @@ -445,7 +445,7 @@ def train_network(self):
# to disk
self.tensor_board.close()

if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
torch.cuda.synchronize(
self.parameters._configuration["device"]
)
Expand All @@ -454,7 +454,7 @@ def train_network(self):
# in the lazy loading case).
if self.parameters.use_shuffling_for_samplers:
self.data.mix_datasets()
if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
torch.cuda.synchronize(
self.parameters._configuration["device"]
)
Expand Down Expand Up @@ -559,7 +559,7 @@ def __prepare_to_train(self, optimizer_dict):
"num_workers": self.parameters.num_workers,
"pin_memory": False,
}
if self.parameters_full.use_gpu:
if self.parameters_full.use_gpu > 0:
kwargs["pin_memory"] = True

# Read last epoch
Expand Down Expand Up @@ -776,9 +776,20 @@ def __prepare_to_train(self, optimizer_dict):
)
)

if self.parameters_full.use_gpu > 1:
if self.parameters_full.network.nn_type != "feed-forward":
raise Exception(
"Only feed-forward networks are supported "
"with multiple GPUs."
)
self.network = torch.nn.DataParallel(
self.network,
device_ids=list(range(self.parameters_full.use_gpu)),
)

def __process_mini_batch(self, network, input_data, target_data):
"""Process a mini batch."""
if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
if self.parameters.use_graphs and self.train_graph is None:
printout("Capturing CUDA graph for training.", min_verbosity=2)
s = torch.cuda.Stream(self.parameters._configuration["device"])
Expand All @@ -799,6 +810,14 @@ def __process_mini_batch(self, network, input_data, target_data):
loss = network.calculate_loss(
prediction, target_data
)
if hasattr(network, "module"):
loss = network.module.calculate_loss(
prediction, target_data
)
else:
loss = network.calculate_loss(
prediction, target_data
)

if self.gradscaler:
self.gradscaler.scale(loss).backward()
Expand Down Expand Up @@ -827,6 +846,15 @@ def __process_mini_batch(self, network, input_data, target_data):
self.static_prediction, self.static_target_data
)

if hasattr(network, "module"):
self.static_loss = network.module.calculate_loss(
self.static_prediction, self.static_target_data
)
else:
self.static_loss = network.calculate_loss(
self.static_prediction, self.static_target_data
)

if self.gradscaler:
self.gradscaler.scale(self.static_loss).backward()
else:
Expand All @@ -851,7 +879,12 @@ def __process_mini_batch(self, network, input_data, target_data):
torch.cuda.nvtx.range_pop()

torch.cuda.nvtx.range_push("loss")
loss = network.calculate_loss(prediction, target_data)
if hasattr(network, "module"):
loss = network.module.calculate_loss(
prediction, target_data
)
else:
loss = network.calculate_loss(prediction, target_data)
# loss
torch.cuda.nvtx.range_pop()

Expand All @@ -874,7 +907,10 @@ def __process_mini_batch(self, network, input_data, target_data):
return loss
else:
prediction = network(input_data)
loss = network.calculate_loss(prediction, target_data)
if hasattr(network, "module"):
loss = network.module.calculate_loss(prediction, target_data)
else:
loss = network.calculate_loss(prediction, target_data)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
Expand Down Expand Up @@ -907,7 +943,7 @@ def __validate_network(self, network, data_set_type, validation_type):
1, device=self.parameters._configuration["device"]
)
with torch.no_grad():
if self.parameters._configuration["gpu"]:
if self.parameters._configuration["gpu"] > 0:
report_freq = self.parameters.training_report_frequency
torch.cuda.synchronize(
self.parameters._configuration["device"]
Expand Down Expand Up @@ -950,9 +986,16 @@ def __validate_network(self, network, data_set_type, validation_type):
enabled=self.parameters.use_mixed_precision
):
prediction = network(x)
loss = network.calculate_loss(
prediction, y
)
if hasattr(
network, "module"
):
loss = network.module.calculate_loss(
prediction, y
)
else:
loss = network.calculate_loss(
prediction, y
)
torch.cuda.current_stream(
self.parameters._configuration["device"]
).wait_stream(s)
Expand Down Expand Up @@ -980,6 +1023,28 @@ def __validate_network(self, network, data_set_type, validation_type):
self.static_prediction_validation,
self.static_target_validation,
)
if hasattr(network, "module"):
self.static_loss_validation = network.module.calculate_loss(
self.static_prediction_validation,
self.static_target_validation,
)
else:
self.static_loss_validation = network.calculate_loss(
self.static_prediction_validation,
self.static_target_validation,
)
with torch.cuda.amp.autocast(
enabled=self.parameters.use_mixed_precision
):
self.static_prediction_validation = (
network(
self.static_input_validation
)
)
self.static_loss_validation = network.calculate_loss(
self.static_prediction_validation,
self.static_target_validation,
)

if self.validation_graph:
self.static_input_validation.copy_(x)
Expand All @@ -993,9 +1058,14 @@ def __validate_network(self, network, data_set_type, validation_type):
enabled=self.parameters.use_mixed_precision
):
prediction = network(x)
loss = network.calculate_loss(
prediction, y
)
if hasattr(network, "module"):
loss = network.module.calculate_loss(
prediction, y
)
else:
loss = network.calculate_loss(
prediction, y
)
validation_loss_sum += loss
if (
batchid != 0
Expand Down Expand Up @@ -1027,9 +1097,15 @@ def __validate_network(self, network, data_set_type, validation_type):
x = x.to(self.parameters._configuration["device"])
y = y.to(self.parameters._configuration["device"])
prediction = network(x)
validation_loss_sum += network.calculate_loss(
prediction, y
).item()

if hasattr(network, "module"):
loss = network.module.calculate_loss(
prediction, y
)
else:
loss = network.calculate_loss(prediction, y)

validation_loss_sum += loss.item()
batchid += 1

validation_loss = validation_loss_sum.item() / batchid
Expand Down
Loading