diff --git a/Dockerfile b/Dockerfile index e7828026..411545f2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:20.04 +FROM bitnami/pytorch:1.12.1 MAINTAINER Jeroen Galjaard @@ -11,14 +11,14 @@ ARG DEBIAN_FRONTEND=noninteractive # Define the working directory of the current Docker container WORKDIR /opt/federation-lab -# Update the Ubuntu software repository and fetch packages -RUN apt-get update \ - && apt-get install -y python3.9 +## Update the Ubuntu software repository and fetch packages +#RUN apt-get update \ +# && apt-get install -y python3.9 # Setup pip3.9 for dependencies -RUN apt install -y curl python3.9-distutils -RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py -RUN python3 get-pip.py +#RUN apt install -y curl python3.9-distutils +#RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py +#RUN python3 get-pip.py # Add Pre-downloaded models (otherwise needs be run every-time) ADD data/ data/ diff --git a/charts/orchestrator/Chart.yaml b/charts/orchestrator/Chart.yaml index 907d1138..639d229f 100644 --- a/charts/orchestrator/Chart.yaml +++ b/charts/orchestrator/Chart.yaml @@ -1,6 +1,6 @@ name: ../docker-compose-gcloud -description: Helm Chart for running the Federator/Orchestrator for the FLTK framework -version: 0.2.0 +description: Helm Chart for running the Orchestrator for the FLTK framework. Requires +version: 0.3.0 apiVersion: v1 appVersion: 1.17.0 keywords: @@ -8,3 +8,8 @@ keywords: - FLTK sources: home: +#dependencies: +#- name: fltk-extractor +# version: "0.1.0" +# repository: "file://../extractor" +# reason: diff --git a/experiments/test/federated.yaml b/experiments/test/federated.yaml index b51efc22..462d12fc 100644 --- a/experiments/test/federated.yaml +++ b/experiments/test/federated.yaml @@ -5,7 +5,7 @@ cuda: false scheduler_step_size: 500 scheduler_gamma: 0.2 rounds: 1 -epochs: 2500 +epochs: 0 lr: 0.05 momentum: 0.011 shuffle: True @@ -18,7 +18,7 @@ loss_function: CrossEntropyLoss clients_per_round: 1 distributed: true single_machine: false -aggregation: Sum +aggregation: FedAvg dataset_name: mnist net_name: FashionMNISTCNN data_sampler: uniform diff --git a/fltk/__main__.py b/fltk/__main__.py index 1276ee61..8c03c460 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -5,6 +5,7 @@ import sys + from fltk.launch import launch_extractor, launch_client, launch_single, launch_remote, launch_cluster, launch_signature from fltk.util.config import get_distributed_config from fltk.util.config.arguments import create_all_subparsers @@ -68,14 +69,15 @@ def main(): print('No configuration path is provided.') launch_fn: launch_signature = __run_op_dict[args.action] + launch_fn(arg_path, conf_path, + _save_get(args, 'rank'), + _save_get(args, 'nic'), + _save_get(args, 'host'), + _save_get(args, 'prefix'), + args, + distributed_config) try: - launch_fn(arg_path, conf_path, - _save_get(args, 'rank'), - _save_get(args, 'nic'), - _save_get(args, 'host'), - _save_get(args, 'prefix'), - args, - distributed_config) + pass except Exception as e: print(f"Failed with reason: {e}") parser.print_help() diff --git a/fltk/core/client.py b/fltk/core/client.py index a97068c1..6d63f09e 100644 --- a/fltk/core/client.py +++ b/fltk/core/client.py @@ -61,11 +61,13 @@ def run(self): self.running = True event = multiprocessing.Event() while self.running: + # Hack for running on Kubeflow if not self.request_queue.empty(): - self.logger.info("Got request, running synchronously") request = self.request_queue.get() + self.logger.info(f"Got request, args: {request} running synchronously.") self.result_queue.put(self.exec_round(*request)) event.wait(1) + self.logger.info(f"Exiting client {self.id}") def stop_client(self): """ @@ -109,6 +111,7 @@ def train(self, num_epochs: int, round_id: int): outputs = self.net(inputs) loss = self.loss_function(outputs, labels) + running_loss += loss.detach().item() loss.backward() self.optimizer.step() @@ -123,7 +126,9 @@ def train(self, num_epochs: int, round_id: int): end_time = time.time() duration = end_time - start_time self.logger.info(f'{progress} Train duration is {duration} seconds') - + # Clear gradients before we send. + self.optimizer.zero_grad(set_to_none=True) + gc.collect() return final_running_loss, self.get_nn_parameters() def set_tau_eff(self, total): @@ -154,7 +159,7 @@ def test(self) -> Tuple[float, float, np.array]: outputs = self.net(images) _, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member - total += labels.size(0) + total += int(labels.size(0)) correct += (predicted == labels).sum().detach().item() targets_.extend(labels.cpu().view_as(predicted).numpy()) diff --git a/fltk/core/distributed/orchestrator.py b/fltk/core/distributed/orchestrator.py index 9cb71e61..3aebb962 100644 --- a/fltk/core/distributed/orchestrator.py +++ b/fltk/core/distributed/orchestrator.py @@ -42,8 +42,9 @@ def _generate_experiment_path_name(task: ArrivalTask, u_id: Union[uuid.UUID, str @rtype: str """ log_dir = config.execution_config.log_path + experiment_path = config.execution_config.experiment_prefix experiment_name = f"{task.dataset}_{task.network}_{u_id}_{task.replication}" - full_path = f"{log_dir}/{experiment_name}" + full_path = f"{log_dir}/{experiment_path}/{experiment_name}" return full_path @@ -209,7 +210,7 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None: self._v1.create_namespaced_config_map(self._config.cluster_config.namespace, config_map) - def wait_for_jobs_to_complete(self): + def wait_for_jobs_to_complete(self, ret=False): """ Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running an experiment. Thereby allowing for running multiple experiments on a single cluster, without letting @@ -225,14 +226,15 @@ def wait_for_jobs_to_complete(self): logging.debug(msg=f"Could not retrieve job_status for {task.id}") job_status = None - if job_status and job_status in {'Completed', 'Failed'}: + if job_status and job_status in {'Completed', 'Failed', 'Succeeded'}: logging.info(f"{task.id} was completed with status: {job_status}, moving to completed") task_to_move.add(task) else: logging.info(f"Waiting for {task.id} to complete") - self.completed_tasks.update(task_to_move) self.deployed_tasks.difference_update(task_to_move) + if ret: + return time.sleep(self.SLEEP_TIME) @@ -311,7 +313,7 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None: if clear: self._clear_jobs() duration = self._config.get_duration() - while self._alive and (duration < 0 or not time.time() - start_time < duration): + while self._alive and ((duration < 0) or not time.time() - start_time < duration): # 1. Check arrivals # If new arrivals, store them in arrival PriorityQueue while not self._arrival_generator.arrivals.empty(): @@ -338,8 +340,7 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None: self._client.create(job_to_start, namespace=self._config.cluster_config.namespace) self.deployed_tasks.add(curr_task) if not self._config.cluster_config.orchestrator.parallel_execution: - self.wait_for_jobs_to_complete() - self._logger.debug("Still alive...") - time.sleep(self.SLEEP_TIME) - self.wait_for_jobs_to_complete() + self.wait_for_jobs_to_complete(ret=True) + self.wait_for_jobs_to_complete(ret=False) + self.stop() logging.info('Experiment completed.') diff --git a/fltk/core/federator.py b/fltk/core/federator.py index 839fff23..60e188a0 100644 --- a/fltk/core/federator.py +++ b/fltk/core/federator.py @@ -1,6 +1,8 @@ from __future__ import annotations import copy +import gc + import numpy as np import sklearn import time @@ -210,7 +212,7 @@ def set_tau_eff(self): # responses.append((client, _remote_method_async(Client.set_tau_eff, client.ref, total))) # torch.futures.wait_all([x[1] for x in responses]) - def test(self, net) -> Tuple[float, float, np.array]: + def test(self, net) -> Tuple[float, float, np.array, float]: """ Function to test the learned global model by the Federator. This does not take the client distributions in account but is centralized. @@ -233,7 +235,7 @@ def test(self, net) -> Tuple[float, float, np.array]: outputs = net(images) _, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member - total += labels.size(0) + total += int(labels.size(0)) correct += (predicted == labels).sum().item() targets_.extend(labels.cpu().view_as(predicted).numpy()) @@ -247,7 +249,7 @@ def test(self, net) -> Tuple[float, float, np.array]: end_time = time.time() duration = end_time - start_time self.logger.info(f'Test duration is {duration} seconds') - return accuracy, loss, confusion_mat + return accuracy, loss, confusion_mat, duration def exec_round(self, com_round_id: int): """ @@ -300,20 +302,30 @@ def all_futures_done(futures: List[torch.Future]) -> bool: # pylint: disable=no time.sleep(0.1) # self.logger.info('') # self.logger.info(f'Waiting for other clients') - + send_receive_duration = time.time() - start_time self.logger.info('Continue with rest [1]') time.sleep(3) - - updated_model = self.aggregation_method(client_weights, client_sizes) self.logger.info(f"Aggregrating: {len(client_weights)} updates, using {self.config.aggregation}") + updated_model = self.aggregation_method(client_weights, client_sizes) + self.logger.info(f"Updating global model.") + self.update_nn_parameters(updated_model) + del client_weights + gc.collect() + self.logger.info(f"Testing global model.") - test_accuracy, test_loss, conf_mat = self.test(self.net) + test_accuracy, test_loss, conf_mat, test_duration = self.test(self.net) self.logger.info(f'[Round {com_round_id:>3}] Federator has a accuracy of {test_accuracy} and loss={test_loss}') end_time = time.time() duration = end_time - start_time - record = FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy, + record = FederatorRecord(num_selected_clients=len(selected_clients), + round_id=com_round_id, + round_duration=duration, + test_duration=test_duration, + send_receive_duration=send_receive_duration, + test_loss=test_loss, + test_accuracy=test_accuracy, confusion_matrix=conf_mat) self.exp_data.append(record) self.logger.info(f'[Round {com_round_id:>3}] Round duration is {duration} seconds') diff --git a/fltk/datasets/federated/cifar100.py b/fltk/datasets/federated/cifar100.py index 22979b53..0a1f417b 100644 --- a/fltk/datasets/federated/cifar100.py +++ b/fltk/datasets/federated/cifar100.py @@ -26,8 +26,10 @@ def init_train_dataset(self): transforms.ToTensor(), normalize ]) - self.train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True, - transform=transform) + self.train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), + train=True, + download=True, + transform=transform) self.train_sampler = get_sampler(self.train_dataset, self.args) self.train_loader = DataLoader(self.train_dataset, batch_size=self.args.batch_size, sampler=self.train_sampler) diff --git a/fltk/launch.py b/fltk/launch.py index ed1fb034..cf374761 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -272,7 +272,7 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC msg = f'Starting with host={host} and port={os.environ["MASTER_PORT"]} and interface={nic}' logging.log(logging.INFO, msg) options = rpc.TensorPipeRpcBackendOptions( - num_worker_threads=16, + num_worker_threads=16 if rank == 0 else 4, rpc_timeout=0, # infinite timeout init_method='env://', _transports=["uv"] # Use LibUV backend for async/IO interaction diff --git a/fltk/nets/cifar_100_resnet.py b/fltk/nets/cifar_100_resnet.py index 33f0a21f..144a277d 100644 --- a/fltk/nets/cifar_100_resnet.py +++ b/fltk/nets/cifar_100_resnet.py @@ -82,7 +82,7 @@ def __init__(self, block: Type[torch.nn.Module] = BasicBlock, num_block=None, nu torch.nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), torch.nn.BatchNorm2d(64), torch.nn.ReLU(inplace=True)) - # we use a different inputsize than the original paper + # we use a different input size than the original paper # so conv2_x's stride is 1 self.conv2_x = self._make_layer(block, 64, num_block[0], 1) self.conv3_x = self._make_layer(block, 128, num_block[1], 2) diff --git a/fltk/samplers/uniform.py b/fltk/samplers/uniform.py index 28b3cc8e..00796be7 100644 --- a/fltk/samplers/uniform.py +++ b/fltk/samplers/uniform.py @@ -9,4 +9,5 @@ class UniformSampler(DistributedSamplerWrapper): def __init__(self, dataset, num_replicas=None, rank=None, seed=0): super().__init__(dataset, num_replicas=num_replicas, rank=rank, seed=seed) indices = list(range(len(self.dataset))) + self.indices = indices[self.rank:self.total_size:self.n_clients] diff --git a/fltk/util/data_container.py b/fltk/util/data_container.py index f052291d..5f2ab6db 100644 --- a/fltk/util/data_container.py +++ b/fltk/util/data_container.py @@ -16,7 +16,9 @@ class DataRecord: class FederatorRecord(DataRecord): num_selected_clients: int round_id: int - round_duration: int + round_duration: float + test_duration: float + send_receive_duration: float test_loss: float test_accuracy: float # Accuracy per class? diff --git a/requirements-cpu.txt b/requirements-cpu.txt index 1329825a..30b37125 100644 --- a/requirements-cpu.txt +++ b/requirements-cpu.txt @@ -33,7 +33,6 @@ numpy==1.22.3 oauthlib==3.2.0 packaging==21.0 parameterized==0.8.1 -Pillow==8.3.2 Pint==0.17 pluggy==1.0.0 prettyprinter==0.18.0 @@ -65,16 +64,10 @@ tensorboard-data-server==0.6.1 tensorboard-plugin-wit==1.8.0 threadpoolctl==2.1.0 tomli==2.0.1 --f https://download.pytorch.org/whl/torch_stable.html -torch==1.9.0+cpu; sys_platform != "darwin" --extra-index-url https://download.pytorch.org/whl/cpu -torch==1.9.0; sys_platform == "darwin" -torchmetrics==0.5.0 -torchsummary==1.5.1 --f https://download.pytorch.org/whl/torch_stable.html -torchvision==0.10.0+cpu; sys_platform != "darwin" ---extra-index-url https://download.pytorch.org/whl/cpu -torch==1.9.0; sys_platform == "darwin" +torch==1.12.1 +torchvision==0.13.1 +torchaudio==0.12.1 tqdm==4.49.0 typing-inspect==0.7.1 typing_extensions==4.1.1