Skip to content

Commit

Permalink
Make execution compatible with PytorchJobss
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Mar 28, 2022
1 parent 804842b commit 74eccb2
Show file tree
Hide file tree
Showing 26 changed files with 433 additions and 742 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MASTER_ADDR=localhost
MASTER_PORT=5000
9 changes: 5 additions & 4 deletions configs/tasks/example_arrival_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
"action": "train"
},
"hyperParameters": {
"batchSize": "128",
"maxEpoch": "5",
"learningRate": "0.01",
"learningrateDecay": "0.0002"
"batchSize": 128,
"maxEpoch": 10,
"learningRate": 0.01,
"learningrateDecay": 0.0002,
"decayStepSize": 50
},
"classProbability": 0.1,
"priorities": [
Expand Down
22 changes: 8 additions & 14 deletions fltk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,15 @@
from dotenv import load_dotenv

from fltk.launch import launch_client, launch_orchestrator
from fltk.util.config.arguments import create_client_parser, create_cluster_parser, extract_learning_parameters
from fltk.util.config.base_config import BareConfig


def main():
parser = ArgumentParser(description='Experiment launcher for the Federated Learning Testbed')
subparsers = parser.add_subparsers(dest="mode")

cluster_parser = subparsers.add_parser('cluster')
cluster_parser.add_argument('config', type=str)

client_parser = subparsers.add_parser('client')
# Option to override rank, by default provided by PytorchJob in Kubeflow.
client_parser.add_argument('--rank', type=int)
# Option to override default nic, by default is 'eth0' in containers.
client_parser.add_argument('--nic', type=str, default=None)
# Option to override 'master' host name, by default provided by PytorchJob in Kubeflow.
client_parser.add_argument('--host', type=str, default=None)
create_client_parser(subparsers)
create_cluster_parser(subparsers)

"""
To create your own parser mirror the construction in the 'client_parser' object.
Expand All @@ -39,6 +31,8 @@ def main():
elif arguments.mode == 'client':
logging.info("Starting in client mode")
client_start(arguments, config)
logging.info("Stopping client...")
exit(0)
else:
print("Provided mode is not supported...")
exit(1)
Expand All @@ -49,13 +43,13 @@ def cluster_start(args: Namespace, configuration: BareConfig):
Function to to launch Orchestrator for execution with provided configurations. Currently
this assumes that a single Orchestrator is started that manages all the resources in the cluster.
"""
logging.info("Starting in ")
launch_orchestrator(args=args, config=configuration)


def client_start(args: Namespace, configuration: BareConfig):
logging.info("Starting in client mode.")
launch_client(args=args, config=configuration)
learning_params = extract_learning_parameters(args)
task_id = args.task_id
launch_client(task_id, config=configuration, learning_params=learning_params)


if __name__ == "__main__":
Expand Down
164 changes: 100 additions & 64 deletions fltk/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
from pathlib import Path
from typing import Union, List

import torch
import torch.distributed as dist
Expand All @@ -9,48 +10,73 @@

from fltk.nets.util.evaluation import calculate_class_precision, calculate_class_recall
from fltk.nets.util.utils import save_model, load_model_from_file
from fltk.schedulers import MinCapableStepLR
from fltk.util.config.arguments import LearningParameters
from fltk.util.config.base_config import BareConfig
from fltk.util.results import EpochData

logging.basicConfig(level=logging.DEBUG)


class Client:

def __init__(self, rank, task_id, config: BareConfig = None):
def __init__(self, rank: int, task_id: int, world_size: int, config: BareConfig = None,
learning_params: LearningParameters = None):
"""
@param rank:
@type rank:
@param task_id:
@type task_id:
@param rank: PyTorch rank provided by KubeFlow setup.
@type rank: int
@param task_id: String identifier representing the UID of the training task
@type task_id: str
@param config:
@type config:
@param learning_params:
@type learning_params: LearningParameters
"""
self._logger = logging.getLogger(f'Client-{rank}-{task_id}')

self._logger.info("Initializing learning client")
self._logger.debug(f"Configuration received: {config}")
self._id = rank
self._world_size = world_size
self._task_id = task_id
self.args = config

self.config = config
self.learning_params = learning_params

# Create model and dataset
self.loss_function = self.learning_params.get_loss()()
self.dataset = self.learning_params.get_dataset_class()(self.config, self.learning_params, self._id,
self._world_size)
self.model = self.learning_params.get_model_class()()
self.device = self._init_device()
self.dataset = self.args.get_dataset()
self.model = self.args.get_model()
self.loss_function = self.args.get_loss_function()
self.optimizer: torch.nn.Module
self.scheduler = self.args.get_scheduler()

def prepare_learner(self, distributed: bool = False, backend=None):
self.optimizer = None
self.scheduler = None

def prepare_learner(self, distributed: bool = False, backend: Union[str, dist.Backend] = None):
"""
Function to prepare the learner, i.e. load all the necessary data into memory.
@param distributed:
@type distributed:
@param backend:
@type backend:
@return:
@rtype:
"""
self._logger.info(f"Preparing learner model with distributed={distributed}")
self.model.to(self.device)
if distributed:
dist.init_process_group(backend)
self.model = torch.nn.parallel.DistributedDataParallel(self.model)

self.optimizer = self.args.get_optimizer(self.model.parameters(),
self.args)
# Currently it is assumed to use an SGD optimizer. **kwargs need to be used to launch this properly
self.optimizer = self.learning_params.get_optimizer()(self.model.parameters(),
lr=self.learning_params.learning_rate,
momentum=0.9)
self.scheduler = MinCapableStepLR(self.optimizer,
self.config.get_scheduler_step_size(),
self.config.get_scheduler_gamma(),
self.config.get_min_lr())

def _init_device(self, cuda_device: torch.device = torch.device('cuda:0')):
def _init_device(self, cuda_device: torch.device = torch.device('cpu')):
"""
Initialize Torch to use available devices. Either prepares CUDA device, or disables CUDA during execution to run
with CPU only inference/training.
Expand All @@ -60,7 +86,7 @@ def _init_device(self, cuda_device: torch.device = torch.device('cuda:0')):
@return:
@rtype:
"""
if self.args.cuda and torch.cuda.is_available():
if self.config.cuda_enabled() and torch.cuda.is_available():
return torch.device(cuda_device)
else:
# Force usage of CPU
Expand All @@ -75,42 +101,52 @@ def load_default_model(self):
"""

model_file = Path(f'{self.model.__name__}.model')
default_model_path = Path(self.args.get_default_model_folder_path()).joinpath(model_file)
default_model_path = Path(self.config.get_default_model_folder_path()).joinpath(model_file)
load_model_from_file(self.model, default_model_path)

def train(self, epoch):
def train(self, epoch, log_interval: int = 100):
"""
Function to start training, regardless of DistributedDataParallel (DPP) or local training. DDP will account for
synchronization of nodes. If extension requires to make use of torch.distributed.send and torch.distributed.recv
(for example for customized training or Federated Learning), additional torch.distributed.barrier calls might
be required to launch.
:param epoch: Current epoch #
:type epoch: int
@param log_interval: Iteration interval at which to log.
@type log_interval: int
"""

running_loss = 0.0
final_running_loss = 0.0

for i, (inputs, labels) in enumerate(self.dataset.get_train_loader(), 0):
inputs, labels = inputs.to(self.device), labels.to(self.device)

self.model.train()
for i, (inputs, labels) in enumerate(self.dataset.get_train_loader()):
# zero the parameter gradients
self.optimizer.zero_grad()

# forward + backward + optimize

# Forward through the net to train
outputs = self.model(inputs)

# Calculate the loss
loss = self.loss_function(outputs, labels)

# Update weights, DPP will account for synchronization of the weights.
loss.backward()
self.optimizer.step()

running_loss += float(loss.detach().item())
del loss, outputs
if i % self.args.get_log_interval() == 0:
self.args.get_logger().info(
'[%d, %5d] loss: %.3f' % (epoch, i, running_loss / self.args.get_log_interval()))
final_running_loss = running_loss / self.args.get_log_interval()
if i % log_interval == 0:
self._logger.info('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / log_interval))
final_running_loss = running_loss / log_interval
running_loss = 0.0
self.scheduler.step()
# save model
if self.args.should_save_model(epoch):
self.save_model(epoch, self.args.get_epoch_save_end_suffix())

# Save model
if self.config.should_save_model(epoch):
# Note that currently this is not supported in the Framework. However, the creation of a ReadWriteMany
# PVC in the deployment charts, and mounting this in the appropriate directory, would resolve this issue.
# This can be done by copying the setup of the PVC used to record the TensorBoard information (used by
# logger created by the rank==0 node during the training process (i.e. to keep track of process).
self.save_model(epoch)

return final_running_loss

Expand All @@ -124,8 +160,10 @@ def test(self):
with torch.no_grad():
for (images, labels) in self.dataset.get_test_loader():
images, labels = images.to(self.device), labels.to(self.device)

dist.reduce
outputs = self.model(images)
# Currently the FLTK framework assumes that a classification task is performed (hence max).
# Future work may add support for non-classification training.
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
Expand All @@ -141,50 +179,48 @@ def test(self):
class_precision = calculate_class_precision(confusion_mat)
class_recall = calculate_class_recall(confusion_mat)

self.args.get_logger().debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy))
self.args.get_logger().debug('Test set: Loss: {}'.format(loss))
self.args.get_logger().debug("Classification Report:\n" + classification_report(targets_, pred_))
self.args.get_logger().debug("Confusion Matrix:\n" + str(confusion_mat))
self.args.get_logger().debug("Class precision: {}".format(str(class_precision)))
self.args.get_logger().debug("Class recall: {}".format(str(class_recall)))
self._logger.debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy))
self._logger.debug('Test set: Loss: {}'.format(loss))
self._logger.debug("Classification Report:\n" + classification_report(targets_, pred_))
self._logger.debug("Confusion Matrix:\n" + str(confusion_mat))
self._logger.debug("Class precision: {}".format(str(class_precision)))
self._logger.debug("Class recall: {}".format(str(class_recall)))

return accuracy, loss, class_precision, class_recall

def run_epochs(self):
def run_epochs(self) -> List[EpochData]:
"""
Function to run epochs wit
Function to run epochs with
"""
num_epochs = self.config.epochs
max_epoch = self.learning_params.max_epoch + 1
start_time_train = datetime.datetime.now()
# Make epochs 1 index.
for epoch in range(1, num_epochs + 1):
loss = self.train(epoch)

epoch_results = []
for epoch in range(1, max_epoch):
train_loss = self.train(epoch)
if self._id == 0:
# Let only the 'master node' work on training. Possibly DDP can be used
# to have a distributed test loader as well to speed up (would require
# aggregration of data.
# aggregation of data.
# Example https://github.com/fabio-deep/Distributed-Pytorch-Boilerplate/blob/0206247150720ca3e287e9531cb20ef68dc9a15f/src/datasets.py#L271-L303.
accuracy, loss, class_precision, class_recall = self.test()
elapsed_time_train = datetime.datetime.now() - start_time_train
train_time_ms = int(elapsed_time_train.total_seconds() * 1000)

elapsed_time_train = datetime.datetime.now() - start_time_train
train_time_ms = int(elapsed_time_train.total_seconds() * 1000)
start_time_test = datetime.datetime.now()
accuracy, test_loss, class_precision, class_recall = self.test()

start_time_test = datetime.datetime.now()
accuracy, test_loss, class_precision, class_recall = self.test()
elapsed_time_test = datetime.datetime.now() - start_time_test
test_time_ms = int(elapsed_time_test.total_seconds() * 1000)
elapsed_time_test = datetime.datetime.now() - start_time_test
test_time_ms = int(elapsed_time_test.total_seconds() * 1000)

data = EpochData(train_time_ms, test_time_ms, loss, accuracy, test_loss, class_precision,
class_recall, client_id=self._id)
data = EpochData(train_time_ms, test_time_ms, train_loss, accuracy, test_loss, class_precision,
class_recall, client_id=self._id)
epoch_results.append(data)
return epoch_results

def save_model(self, epoch, suffix):
def save_model(self, epoch):
"""
Move function to utils directory.
Saves the model if necessary.
"""
self.args.get_logger().debug(f"Saving model to flat file storage. Saved at epoch #{epoch}")

save_model(self.model, epoch, self.args.get_save_model_folder_path(), self.args)


self._logger.debug(f"Saving model to flat file storage. Saved at epoch #{epoch}")
save_model(self.model, self.config.get_save_model_folder_path(), epoch)
1 change: 0 additions & 1 deletion fltk/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .distributed import *
from .cifar10 import CIFAR10Dataset
from .cifar100 import CIFAR100Dataset
from .fashion_mnist import FashionMNISTDataset
Loading

0 comments on commit 74eccb2

Please sign in to comment.