From c9ce5769483a9995bbebc8a591b3ec0010db1760 Mon Sep 17 00:00:00 2001 From: Jeroen Galjaard Date: Thu, 2 Sep 2021 17:46:10 +0200 Subject: [PATCH] Integrate features for MVP --- .dockerignore | 9 +- .gitignore | 2 +- Dockerfile | 10 +- README.md | 43 +++- charts/extractor/Chart.yaml | 10 + .../extractor/templates/fl-extractor-pod.yaml | 38 ++++ .../fl-log-claim-persistentvolumeclaim.yaml | 14 ++ ...fl-server-claim-persistentvolumeclaim.yaml | 15 ++ charts/extractor/test.yaml | 38 ---- charts/extractor/values.yaml | 5 + charts/federator/Chart.yaml | 7 +- ...fl-server-claim-persistentvolumeclaim.yaml | 14 -- .../templates/fl-server-clusterrole.yaml | 1 + charts/federator/templates/fl-server-pod.yaml | 13 +- .../templates/fl-server-service.yaml | 6 +- charts/fltk-values.yaml | 6 +- charts/worker/templates/client-slow.yaml | 4 - configs/example_cloud_experiment.json | 1 + configs/tasks/example_arrival_config.json | 20 +- fltk/__main__.py | 22 +- fltk/client.py | 81 ++++--- fltk/datasets/cifar10.py | 4 +- fltk/datasets/dataset.py | 3 - fltk/extractor.py | 18 ++ fltk/launch.py | 22 +- fltk/nets/util/evaluation.py | 4 +- fltk/orchestrator.py | 123 ++++------ fltk/schedulers/min_lr_step.py | 10 +- fltk/util/cluster/client.py | 213 +++++++++++------- fltk/util/cluster/conversion.py | 3 +- fltk/util/config/arguments.py | 24 +- fltk/util/config/base_config.py | 18 +- fltk/util/results.py | 8 +- fltk/util/task/config/parameter.py | 22 +- fltk/util/task/generator/arrival_generator.py | 35 +-- fltk/util/task/task.py | 13 +- requirements.txt | 19 +- 37 files changed, 532 insertions(+), 366 deletions(-) create mode 100644 charts/extractor/Chart.yaml create mode 100644 charts/extractor/templates/fl-extractor-pod.yaml create mode 100644 charts/extractor/templates/fl-log-claim-persistentvolumeclaim.yaml create mode 100644 charts/extractor/templates/fl-server-claim-persistentvolumeclaim.yaml delete mode 100644 charts/extractor/test.yaml create mode 100644 charts/extractor/values.yaml delete mode 100644 charts/federator/templates/fl-server-claim-persistentvolumeclaim.yaml create mode 100644 fltk/extractor.py diff --git a/.dockerignore b/.dockerignore index 1fe8ba7d..86318042 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,6 @@ -venv -data_loaders -simple_example +# Ignoring the venv +venv/ + +# +# Ignoring all the compressed archives +**/*.tar.gz diff --git a/.gitignore b/.gitignore index 106b5275..58220014 100644 --- a/.gitignore +++ b/.gitignore @@ -28,7 +28,7 @@ share/python-wheels/ MANIFEST # PyInstaller -# Usually these files are written by a python script from a template +# Usually these files are written by a python script from a master_template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec diff --git a/Dockerfile b/Dockerfile index ca88acf7..ffbb2fd1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,16 +17,20 @@ WORKDIR /opt/federation-lab RUN apt-get update \ && apt-get install -y vim curl python3 python3-pip net-tools iproute2 +# Add Pre-downloaded models (otherwise needs be run every-time) +ADD data/ data/ + # Use cache for pip, otherwise we repeatedly pull from repository ADD setup.py requirements.txt ./ RUN --mount=type=cache,target=/root/.cache/pip python3 -m pip install -r requirements.txt -ADD configs configs +# Add FLTK and configurations ADD fltk fltk -ADD scripts scripts +ADD configs configs # Expose default port 5000 to the host OS. EXPOSE 5000 # Update relevant runtime configuration for experiment -COPY configs/example_cloud_experiment.json configs/example_cloud_experiment.json \ No newline at end of file +COPY configs/ configs/ + diff --git a/README.md b/README.md index aa789703..949f0b0c 100644 --- a/README.md +++ b/README.md @@ -219,7 +219,48 @@ kustomize build common/istio-1-9/kubeflow-istio-resources/base | kubectl apply - ```bash kustomize build apps/pytorch-job/upstream/overlays/kubeflow | kubectl apply -f - ``` -#### +### Installing NFS +During the execution a `ReadWriteMany` persistent volume is needed. This is because each master worker node uses a +`SummaryWriter` to log the training progress. As such, multiple container on potentially different machines require +read-write access to this logging persistent volume. One way to resolve this, is to make use of Google Firestore (or +equivalent on your service provider of choice). However, using this incurs significant operating costs, as operation +starts at 1 TiB. As such, we setup a NFS on our cluster, which will provide this service to the cluster. + + +For this, we will make use of the `nfs-server-provisioner` Helm chart, that neatly wraps this functionality in an easy +to deploy chart. Make sure to install the NFS server in the same *namespace* as where you want to run your experiments. +```bash +helm install repo raphael https://raphaelmonrouzeau.github.io/charts/repository/ +helm update +helm install nfs-server raphael/nfs-server-provisioner --set persistence.enabled=true,persistence.storageClass=do-block-storage,persistence.size=20Gi +``` + +**N.B.** If you wish to use a volume as both **ReadWriteOnce** and **ReadOnlyMany**, GCE does not provide this service +You'll need to either create a **ReadWriteMany** PV with read only claims, or ensure that the writer completes before +the readers are spawned. + +### Setting up the Extractor + +This section only needs to be run once, as this will setup the TensorBoard service, as well as populate the data volume +that contains the DataSets. Including another will require you to either create a pod, or update the code and re-install +the Extractor chart/upgrade the chart. + +** N.B. ** Note that removing the Extractor chart will result in the deletion of the Persistent Volume Claims, i.e. this +will remove the data that is stored on these volumes. Make sure to COPY the contents of these directories to your local +file system before doing so. + +```bash +cd charts +helm install extractor -f values.yaml +``` + +Wait for it to deploy. + + +### Launching an experiment +We have now completed the setup of the project, and can continue by running actual experiments. If no errors occur, this +should. You may also skip this step and work on your own code, but it might be good to test your deployment +before running into trouble later. ## Known issues * Currently, there is no GPU support docker containers (or docker compose) diff --git a/charts/extractor/Chart.yaml b/charts/extractor/Chart.yaml new file mode 100644 index 00000000..3e64b092 --- /dev/null +++ b/charts/extractor/Chart.yaml @@ -0,0 +1,10 @@ +name: fltk-extractor +description: Helm Chart for running the Extractor for the FLTK framework +version: 0.1.0 +apiVersion: v1 +appVersion: 1.17.0 +keywords: + - extractor + - FLTK +sources: +home: diff --git a/charts/extractor/templates/fl-extractor-pod.yaml b/charts/extractor/templates/fl-extractor-pod.yaml new file mode 100644 index 00000000..cc5b91bd --- /dev/null +++ b/charts/extractor/templates/fl-extractor-pod.yaml @@ -0,0 +1,38 @@ +apiVersion: v1 +kind: Pod +metadata: + creationTimestamp: null + labels: + srv.fltk.extractor: fl-extractor + name: fl-extractor +spec: + containers: + - name: federation-lab-server + args: + - tensorboard + - --logdir + - logs + image: "{{ .Values.provider.domain }}/{{ .Values.provider.projectName }}/{{ .Values.provider.imageName }}" + ports: + - containerPort: 8443 + resources: + # Set the following values in case you want to limit the reserved resources. + # Whenever the pod exceeds these requirements, the Scheduler (Kubernetes) may kill + # the pod, resulting in startup. + requests: {} + limits: {} + volumeMounts: + - mountPath: /opt/federation-lab/output + name: fl-server-claim + readOnly: true # We mount output readOnly as this is for Orchestrator data. + - mountPath: /opt/federation-lab/log + name: fl-log-claim + readOnly: true # We mount log readOnly as this is for the Master worker nodes. + restartPolicy: OnFailure # Whenever the container crashes, restart. + volumes: + - name: fl-server-claim + persistentVolumeClaim: + claimName: fl-server-claim + - name: fl-log-claim + persistentVolumeClaim: + claimName: fl-log-claim diff --git a/charts/extractor/templates/fl-log-claim-persistentvolumeclaim.yaml b/charts/extractor/templates/fl-log-claim-persistentvolumeclaim.yaml new file mode 100644 index 00000000..d663929a --- /dev/null +++ b/charts/extractor/templates/fl-log-claim-persistentvolumeclaim.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + io.fltk.service: fl-log-claim + name: fl-log-claim +spec: + # Only the Extractor pod should be able to write to the part. Other pods 'simply' use it as read-only + accessModes: + - ReadWriteMany + resources: + requests: + storage: {{ .Values.extractor.logging.size }} + storageClassName: "nfs" diff --git a/charts/extractor/templates/fl-server-claim-persistentvolumeclaim.yaml b/charts/extractor/templates/fl-server-claim-persistentvolumeclaim.yaml new file mode 100644 index 00000000..42100310 --- /dev/null +++ b/charts/extractor/templates/fl-server-claim-persistentvolumeclaim.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + io.fltk.service: fl-server-claim + name: fl-server-claim +spec: + # Only the Orchestator pod should be able to write to the part. Extractor functions as a simple 'read' to + # Make datacollection easier. + accessModes: + - ReadWriteMany + resources: + requests: + storage: {{ .Values.extractor.logging.size }} + storageClassName: "nfs" diff --git a/charts/extractor/test.yaml b/charts/extractor/test.yaml deleted file mode 100644 index e9c7274f..00000000 --- a/charts/extractor/test.yaml +++ /dev/null @@ -1,38 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - annotations: - kompose.cmd: kompose convert -f ../docker-compose.yml - kompose.version: 1.22.0 (HEAD) - creationTimestamp: null - labels: - io.kompose.service: fl-server - name: fl-extractor -spec: - containers: - - args: - - python3 - - -m - - fltk - - single - - configs/local_experiment.yaml - - --rank=0 - env: - - name: MASTER_PORT - value: "5000" - - name: PYTHONUNBUFFERED - value: "1" - image: gcr.io/cs4290-dml/fltk:latest - name: federation-lab-server - ports: - - containerPort: 5000 - resources: {} - volumeMounts: - - mountPath: /opt/federation-lab/output - name: fl-server-claim - restartPolicy: Never - volumes: - - name: fl-server-claim - persistentVolumeClaim: - claimName: fl-server-claim -status: {} diff --git a/charts/extractor/values.yaml b/charts/extractor/values.yaml new file mode 100644 index 00000000..6bfab688 --- /dev/null +++ b/charts/extractor/values.yaml @@ -0,0 +1,5 @@ +extractor: + dataset: + size: 2Gi + logging: + size: 5Gi \ No newline at end of file diff --git a/charts/federator/Chart.yaml b/charts/federator/Chart.yaml index 9643884d..3d40d0dc 100644 --- a/charts/federator/Chart.yaml +++ b/charts/federator/Chart.yaml @@ -1,9 +1,10 @@ name: ../docker-compose-gcloud description: Helm Chart for running the Federator/Orchestrator for the FLTK framework -version: 0.0.1 +version: 0.1.0 apiVersion: v1 -appVersion: 1.16.0 +appVersion: 1.17.0 keywords: - - ../docker-compose-gcloud + - Orchestrator + - FLTK sources: home: diff --git a/charts/federator/templates/fl-server-claim-persistentvolumeclaim.yaml b/charts/federator/templates/fl-server-claim-persistentvolumeclaim.yaml deleted file mode 100644 index 076cea03..00000000 --- a/charts/federator/templates/fl-server-claim-persistentvolumeclaim.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - creationTimestamp: null - labels: - io.kompose.service: fl-server-claim - name: fl-server-claim -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 20Gi -status: {} diff --git a/charts/federator/templates/fl-server-clusterrole.yaml b/charts/federator/templates/fl-server-clusterrole.yaml index 52291297..3b27acf9 100644 --- a/charts/federator/templates/fl-server-clusterrole.yaml +++ b/charts/federator/templates/fl-server-clusterrole.yaml @@ -2,6 +2,7 @@ kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fl-server +# Currently we allow every action with this ClusterRole, these could be restricted if deemed necessary rules: - apiGroups: - '*' diff --git a/charts/federator/templates/fl-server-pod.yaml b/charts/federator/templates/fl-server-pod.yaml index e0cf3841..9444cb26 100644 --- a/charts/federator/templates/fl-server-pod.yaml +++ b/charts/federator/templates/fl-server-pod.yaml @@ -1,12 +1,8 @@ apiVersion: v1 kind: Pod metadata: - annotations: - kompose.cmd: kompose convert -f ../docker-compose.yml - kompose.version: 1.22.0 (HEAD) - creationTimestamp: null labels: - io.kompose.service: fl-server + fltk.service: fl-server name: fl-server spec: containers: @@ -18,14 +14,12 @@ spec: - configs/example_cloud_experiment.json - --rank=0 env: - - name: MASTER_PORT - value: {{ quote (.Values.fltk.port | int) }} - name: PYTHONUNBUFFERED value: "1" - image: localhost:5000/fltk + image: {{ .Values.provider.domain }}/{{ .Values.provider.projectName }}/{{ .Values.provider.imageName}} name: federation-lab-server ports: - - containerPort: {{ (.Values.fltk.port)}} + - containerPort: {{ (.Values.fltk.port) }} resources: limits: cpu: {{ (.Values.federator.cpu | int) }} @@ -33,6 +27,7 @@ spec: volumeMounts: - mountPath: /opt/federation-lab/output name: fl-server-claim + readOnly: true restartPolicy: Never volumes: - name: fl-server-claim diff --git a/charts/federator/templates/fl-server-service.yaml b/charts/federator/templates/fl-server-service.yaml index b693414b..69709780 100644 --- a/charts/federator/templates/fl-server-service.yaml +++ b/charts/federator/templates/fl-server-service.yaml @@ -1,12 +1,8 @@ apiVersion: v1 kind: Service metadata: - annotations: - kompose.cmd: kompose convert -c -f ../docker-compose-gcloud.yml - kompose.version: 1.22.0 (HEAD) - creationTimestamp: null labels: - io.kompose.service: fl-server + service: fl-server name: fl-server spec: ports: diff --git a/charts/fltk-values.yaml b/charts/fltk-values.yaml index 2dead84f..5184398d 100644 --- a/charts/fltk-values.yaml +++ b/charts/fltk-values.yaml @@ -1,4 +1,8 @@ fltk: worldsize: 50 config: cloud_experiment.yaml - port: 5000 \ No newline at end of file + port: 5000 +provider: + domain: gcr.io + projectName: test-bed-distml + imageName: fltk:latest diff --git a/charts/worker/templates/client-slow.yaml b/charts/worker/templates/client-slow.yaml index a18772f9..854edcbe 100644 --- a/charts/worker/templates/client-slow.yaml +++ b/charts/worker/templates/client-slow.yaml @@ -24,14 +24,10 @@ spec: - configs/cloud_experiment.yaml - --rank={{ . }} env: - - name: MASTER_PORT - value: "5000" - name: GLOO_SOCKET_IFNAME value: eth0 - name: PYTHONUNBUFFERED value: "1" - - name: RANK - value: {{ quote $rank }} - name: TP_SOCKET_IFNAME value: eth0 - name: WORLD_SIZE diff --git a/configs/example_cloud_experiment.json b/configs/example_cloud_experiment.json index 4a62a785..98d61f76 100644 --- a/configs/example_cloud_experiment.json +++ b/configs/example_cloud_experiment.json @@ -11,6 +11,7 @@ } }, "execution_config": { + "duration": 3600 "experiment_prefix": "cloud_experiment", "cuda": false, "tensorboard": { diff --git a/configs/tasks/example_arrival_config.json b/configs/tasks/example_arrival_config.json index 6ab5d10d..5207b9a4 100644 --- a/configs/tasks/example_arrival_config.json +++ b/configs/tasks/example_arrival_config.json @@ -2,13 +2,13 @@ "jobClassParameters": [ { "networkConfiguration": { - "network": "ExampleCNN", + "network": "FashionMNISTCNN", "dataset": "MNIST" }, "systemParameters": { "dataParallelism": "1", "executorCores": "1", - "executorMemory": "1G", + "executorMemory": "1Gi", "action": "train" }, "hyperParameters": { @@ -25,22 +25,19 @@ } ], "lambda": 0.004, - "preemptJobs": 0, - "runtime": 3600 + "preemptJobs": 0 }, { "jobClassParameters": [ { "networkConfiguration": { - "network": "ExampleCNN", + "network": "FashionMNISTResNet", "dataset": "MNIST" }, "systemParameters": { - "driverCores": "1", - "driverMemory": "1G", "dataParallelism": "1", "executorCores": "1", - "executorMemory": "1G", + "executorMemory": "1Gi", "action": "train" }, "hyperParameters": { @@ -56,13 +53,13 @@ }, { "networkConfiguration": { - "network": "ExampleCNN", + "network": "FashionMNISTCNN", "dataset": "MNIST" }, "systemParameters": { "dataParallelism": "1", "executorCores": "1", - "executorMemory": "1G", + "executorMemory": "1Gi", "action": "train" }, "hyperParameters": { @@ -79,8 +76,7 @@ } ], "lambda": 0.004, - "preemptJobs": 0, - "runtime": 3600 + "preemptJobs": 0 } ] diff --git a/fltk/__main__.py b/fltk/__main__.py index 0c923c97..4eb3f0bd 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -1,20 +1,20 @@ import json import logging from argparse import Namespace, ArgumentParser +from pathlib import Path -from dotenv import load_dotenv - -from fltk.launch import launch_client, launch_orchestrator -from fltk.util.config.arguments import create_client_parser, create_cluster_parser, extract_learning_parameters +from fltk.launch import launch_client, launch_orchestrator, launch_extractor +from fltk.util.config.arguments import create_client_parser, create_cluster_parser, extract_learning_parameters, \ + create_extractor_parser from fltk.util.config.base_config import BareConfig -def main(): +def __main__(): parser = ArgumentParser(description='Experiment launcher for the Federated Learning Testbed') subparsers = parser.add_subparsers(dest="mode") create_client_parser(subparsers) create_cluster_parser(subparsers) - + create_extractor_parser(subparsers) """ To create your own parser mirror the construction in the 'client_parser' object. Or refer to the ArgumentParser library documentation. @@ -23,7 +23,8 @@ def main(): arguments = parser.parse_args() with open(arguments.config, 'r') as config_file: - config = BareConfig.from_dict(json.load(config_file)) + config: BareConfig = BareConfig.from_dict(json.load(config_file)) + config.config_path = Path(arguments.config) if arguments.mode == 'cluster': logging.info("Starting in cluster mode.") @@ -33,6 +34,8 @@ def main(): client_start(arguments, config) logging.info("Stopping client...") exit(0) + elif arguments.mode == 'extractor': + launch_extractor(arguments, config) else: print("Provided mode is not supported...") exit(1) @@ -53,7 +56,4 @@ def client_start(args: Namespace, configuration: BareConfig): if __name__ == "__main__": - # Load dotenv with default values. However, the Pytorch-Operator should set the necessary - # environmental variables to get started. - load_dotenv() - main() + __main__() diff --git a/fltk/client.py b/fltk/client.py index 166733d6..d842a89d 100644 --- a/fltk/client.py +++ b/fltk/client.py @@ -1,29 +1,31 @@ import datetime import logging from pathlib import Path -from typing import Union, List +from typing import Union, List, Tuple +import numpy as np import torch import torch.distributed as dist -from sklearn.metrics import classification_report from sklearn.metrics import confusion_matrix +from torch.utils.tensorboard import SummaryWriter from fltk.nets.util.evaluation import calculate_class_precision, calculate_class_recall from fltk.nets.util.utils import save_model, load_model_from_file from fltk.schedulers import MinCapableStepLR +from fltk.schedulers.min_lr_step import LearningScheduler from fltk.util.config.arguments import LearningParameters from fltk.util.config.base_config import BareConfig from fltk.util.results import EpochData -class Client: +class Client(object): - def __init__(self, rank: int, task_id: int, world_size: int, config: BareConfig = None, + def __init__(self, rank: int, task_id: str, world_size: int, config: BareConfig = None, learning_params: LearningParameters = None): """ @param rank: PyTorch rank provided by KubeFlow setup. @type rank: int - @param task_id: String identifier representing the UID of the training task + @param task_id: String id representing the UID of the training task @type task_id: str @param config: @type config: @@ -33,7 +35,6 @@ def __init__(self, rank: int, task_id: int, world_size: int, config: BareConfig self._logger = logging.getLogger(f'Client-{rank}-{task_id}') self._logger.info("Initializing learning client") - self._logger.debug(f"Configuration received: {config}") self._id = rank self._world_size = world_size self._task_id = task_id @@ -48,18 +49,20 @@ def __init__(self, rank: int, task_id: int, world_size: int, config: BareConfig self.model = self.learning_params.get_model_class()() self.device = self._init_device() - self.optimizer = None - self.scheduler = None + self.optimizer: torch.optim.Optimizer + self.scheduler: LearningScheduler + self.tb_writer: SummaryWriter - def prepare_learner(self, distributed: bool = False, backend: Union[str, dist.Backend] = None): + def prepare_learner(self, distributed: bool = False, backend: Union[str, dist.Backend] = None) -> None: """ Function to prepare the learner, i.e. load all the necessary data into memory. - @param distributed: - @type distributed: - @param backend: - @type backend: - @return: - @rtype: + @param distributed: Indicates whether the execution must be run in Distributed fashion with DDP. + @type distributed: bool + @param backend: Which backend to use during training, needed when executing in distributed fashion, + for CPU execution the GLOO (default) backend must be used. For GPU execution, the NCCL execution is needed. + @type backend: dist.Backend + @return: None + @rtype: None """ self._logger.info(f"Preparing learner model with distributed={distributed}") self.model.to(self.device) @@ -76,6 +79,12 @@ def prepare_learner(self, distributed: bool = False, backend: Union[str, dist.Ba self.config.get_scheduler_gamma(), self.config.get_min_lr()) + self.tb_writer = SummaryWriter(str(self.config.get_log_path(self._task_id, self._id, self.learning_params.model))) + + def stop_learner(self): + self._logger.info(f"Tearing down Client {self._id}") + self.tb_writer.close() + def _init_device(self, cuda_device: torch.device = torch.device('cpu')): """ Initialize Torch to use available devices. Either prepares CUDA device, or disables CUDA during execution to run @@ -104,7 +113,7 @@ def load_default_model(self): default_model_path = Path(self.config.get_default_model_folder_path()).joinpath(model_file) load_model_from_file(self.model, default_model_path) - def train(self, epoch, log_interval: int = 100): + def train(self, epoch, log_interval: int = 50): """ Function to start training, regardless of DistributedDataParallel (DPP) or local training. DDP will account for synchronization of nodes. If extension requires to make use of torch.distributed.send and torch.distributed.recv @@ -150,17 +159,18 @@ def train(self, epoch, log_interval: int = 100): return final_running_loss - def test(self): + def test(self) -> Tuple[float, float, np.array, np.array, np.array]: correct = 0 total = 0 targets_ = [] pred_ = [] loss = 0.0 + # Disable gradient calculation, as we are only interested in predictions with torch.no_grad(): for (images, labels) in self.dataset.get_test_loader(): images, labels = images.to(self.device), labels.to(self.device) - dist.reduce + outputs = self.model(images) # Currently the FLTK framework assumes that a classification task is performed (hence max). # Future work may add support for non-classification training. @@ -174,19 +184,18 @@ def test(self): loss += self.loss_function(outputs, labels).item() accuracy = 100.0 * correct / total - confusion_mat = confusion_matrix(targets_, pred_) + confusion_mat: np.array = confusion_matrix(targets_, pred_) - class_precision = calculate_class_precision(confusion_mat) - class_recall = calculate_class_recall(confusion_mat) + class_precision: np.array = calculate_class_precision(confusion_mat) + class_recall: np.array = calculate_class_recall(confusion_mat) - self._logger.debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) - self._logger.debug('Test set: Loss: {}'.format(loss)) - self._logger.debug("Classification Report:\n" + classification_report(targets_, pred_)) - self._logger.debug("Confusion Matrix:\n" + str(confusion_mat)) - self._logger.debug("Class precision: {}".format(str(class_precision))) - self._logger.debug("Class recall: {}".format(str(class_recall))) + # self._logger.debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) + # self._logger.debug('Test set: Loss: {}'.format(loss)) + # self._logger.debug("Confusion Matrix:\n" + str(confusion_mat)) + # self._logger.debug("Class precision: {}".format(str(class_precision))) + # self._logger.debug("Class recall: {}".format(str(class_recall))) - return accuracy, loss, class_precision, class_recall + return accuracy, loss, class_precision, class_recall, confusion_mat def run_epochs(self) -> List[EpochData]: """ @@ -207,14 +216,15 @@ def run_epochs(self) -> List[EpochData]: train_time_ms = int(elapsed_time_train.total_seconds() * 1000) start_time_test = datetime.datetime.now() - accuracy, test_loss, class_precision, class_recall = self.test() + accuracy, test_loss, class_precision, class_recall, confusion_mat = self.test() elapsed_time_test = datetime.datetime.now() - start_time_test test_time_ms = int(elapsed_time_test.total_seconds() * 1000) data = EpochData(train_time_ms, test_time_ms, train_loss, accuracy, test_loss, class_precision, - class_recall, client_id=self._id) + confusion_mat, class_recall) epoch_results.append(data) + self.log_progress(data, epoch) return epoch_results def save_model(self, epoch): @@ -224,3 +234,14 @@ def save_model(self, epoch): """ self._logger.debug(f"Saving model to flat file storage. Saved at epoch #{epoch}") save_model(self.model, self.config.get_save_model_folder_path(), epoch) + + def log_progress(self, epoch_data: EpochData, epoch): + + + self.tb_writer.add_scalar('training loss per epoch', + epoch_data.loss_train, + epoch) + + self.tb_writer.add_scalar('accuracy per epoch', + epoch_data.accuracy, + epoch) diff --git a/fltk/datasets/cifar10.py b/fltk/datasets/cifar10.py index 8b6d763a..c4b4a888 100644 --- a/fltk/datasets/cifar10.py +++ b/fltk/datasets/cifar10.py @@ -21,7 +21,7 @@ def load_train_dataset(self, rank: int = 0, world_size: int = None): train_dataset = datasets.CIFAR10(root=self.config.get_data_path(), train=True, download=True, transform=transform) sampler = DistributedSampler(train_dataset, rank=rank, num_replicas=self.world_size) if self.world_size else None - train_loader = DataLoader(train_dataset, batch_size=self.learning_params.batch_size, sampler=sampler, + train_loader = DataLoader(train_dataset, batch_size=self.learning_params.bs, sampler=sampler, shuffle=(sampler is None)) return train_loader @@ -36,5 +36,5 @@ def load_test_dataset(self): transform=transform) sampler = DistributedSampler(test_dataset, rank=self.rank, num_replicas=self.world_size) if self.world_size else None - test_loader = DataLoader(test_dataset, batch_size=self.learning_params.batch_size, sampler=sampler) + test_loader = DataLoader(test_dataset, batch_size=self.learning_params.bs, sampler=sampler) return test_loader diff --git a/fltk/datasets/dataset.py b/fltk/datasets/dataset.py index 00f3f7e3..9e7f4489 100644 --- a/fltk/datasets/dataset.py +++ b/fltk/datasets/dataset.py @@ -6,9 +6,6 @@ class Dataset: - """ - TODO: Look into RPC memory leaks occuring due to https://github.com/pytorch/pytorch/issues/61920 - """ def __init__(self, config, learning_params, rank: int, world_size: int): self.config = config diff --git a/fltk/extractor.py b/fltk/extractor.py new file mode 100644 index 00000000..0c693cdc --- /dev/null +++ b/fltk/extractor.py @@ -0,0 +1,18 @@ +from argparse import Namespace + +from torchvision.datasets import FashionMNIST, CIFAR10, CIFAR100, MNIST + +from fltk.util.config import BareConfig + + +def download_datasets(args: Namespace, config: BareConfig): + # Prepare MNIST + mnist = MNIST(root=config.get_data_path(), download=True) + # Prepare Fashion MNIST + mnist = FashionMNIST(root=config.get_data_path(), download=True) + del mnist + # Prepare CIFAR10 + cifar10 = CIFAR10(root=config.get_data_path(), download=True) + del cifar10 + # Prepare CIFAR100 + cifar100 = CIFAR100(root=config.get_data_path(), download=True) diff --git a/fltk/launch.py b/fltk/launch.py index 870fe78e..9cd02724 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -6,6 +6,7 @@ import torch.distributed as dist from fltk.client import Client +from fltk.extractor import download_datasets from fltk.orchestrator import Orchestrator from fltk.util.cluster.client import ClusterManager from fltk.util.config.arguments import LearningParameters @@ -31,7 +32,7 @@ def is_distributed() -> bool: return dist.is_available() and world_size > 1 -def launch_client(task_id, config: BareConfig = None, learning_params: LearningParameters = None): +def launch_client(task_id: str, config: BareConfig = None, learning_params: LearningParameters = None): """ @param task_id: @type task_id: @@ -59,26 +60,31 @@ def launch_client(task_id, config: BareConfig = None, learning_params: LearningP def launch_orchestrator(args: Namespace = None, config: BareConfig = None): """ Default runner for the Orchestrator that is based on KubeFlow - @param args: - @type args: + @param args: Commandline arguments passed to the execution. Might be removed in a future commit. + @type args: Namespace @param config: Configuration for components, needed for spinning up components of the Orchestrator. @type config: BareConfig - @return: - @rtype: + @return: None + @rtype: None """ logging.info('Starting as Orchestrator') logging.info("Starting Orchestrator, initializing resources....") - orchestrator = Orchestrator(config) - cluster_manager = ClusterManager() + arrival_generator = ExperimentGenerator() + cluster_manager = ClusterManager() + orchestrator = Orchestrator(cluster_manager, arrival_generator, config) pool = ThreadPool(3) logging.info("Starting cluster manager") pool.apply_async(cluster_manager.start) logging.info("Starting arrival generator") - pool.apply_async(arrival_generator.run) + pool.apply_async(arrival_generator.start(config.get_duration())) logging.info("Starting orchestrator") pool.apply(orchestrator.run) pool.join() logging.info("Stopped execution of Orchestrator...") + + +def launch_extractor(args: Namespace, config: BareConfig): + download_datasets(args, config) diff --git a/fltk/nets/util/evaluation.py b/fltk/nets/util/evaluation.py index 15ed4cba..e990695b 100644 --- a/fltk/nets/util/evaluation.py +++ b/fltk/nets/util/evaluation.py @@ -1,14 +1,14 @@ import numpy as np -def calculate_class_precision(conf_mat: np.array): +def calculate_class_precision(conf_mat: np.array) -> np.array: """ Calculates the precision for each class from a confusion matrix. """ return np.diagonal(conf_mat) / np.sum(conf_mat, axis=0) -def calculate_class_recall(conf_mat: np.array): +def calculate_class_recall(conf_mat: np.array) -> np.array: """ Calculates the recall for each class from a confusion matrix. """ diff --git a/fltk/orchestrator.py b/fltk/orchestrator.py index 3d2b0f74..f03aa8c7 100644 --- a/fltk/orchestrator.py +++ b/fltk/orchestrator.py @@ -1,88 +1,56 @@ import logging import time +import uuid from queue import PriorityQueue from typing import List import kubernetes.config -from dataclass_csv import DataclassWriter -from kubernetes import client +from kubeflow.pytorchjob import PyTorchJobClient +from fltk.util.cluster.client import construct_job, ClusterManager from fltk.util.config.base_config import BareConfig -from fltk.util.results import EpochData from fltk.util.task.config.parameter import TrainTask +from fltk.util.task.generator.arrival_generator import ArrivalGenerator +from fltk.util.task.task import ArrivalTask class Orchestrator(object): """ - Central component of the Federated Learning System: The Federator + Central component of the Federated Learning System: The Orchestrator - The Federator is in charge of the following tasks: - - Have a copy of the global model - - Client selection - - Aggregating the client model weights/gradients - - Saving all the metrics - - Use tensorboard to report metrics + The Orchestrator is in charge of the following tasks: + - Running experiments + - Creating and/or managing tasks + - Keep track of progress (pending/started/failed/completed) - Keep track of timing + Note that the Orchestrator does not function like a Federator, in the sense that it keeps a central model, performs + aggregations and keeps track of Clients. For this, the KubeFlow PyTorch-Operator is used to deploy a train task as + a V1PyTorchJob, which automatically generates the required setup in the cluster. In addition, this allows more Jobs + to be scheduled, than that there are resources, as such, letting the Kubernetes Scheduler let decide when to run + which containers where. """ _alive = False # Priority queue, requires an orderable object, otherwise a Tuple[int, Any] can be used to insert. - pending_tasks: PriorityQueue[TrainTask] = PriorityQueue() - deployed_tasks: List[TrainTask] = [] + pending_tasks: "PriorityQueue[ArrivalTask]" = PriorityQueue() + deployed_tasks: List[ArrivalTask] = [] completed_tasks: List[str] = [] - def __init__(self, config: BareConfig = None): + def __init__(self, cluster_mgr: ClusterManager, arv_gen: ArrivalGenerator, config: BareConfig): self._logger = logging.getLogger('Orchestrator') - self._logger.debug("Loading in-cluster configuration") kubernetes.config.load_incluster_config() - self.config = config - self._v1 = client.CoreV1Api() - self._batch_api = client.BatchV1Api() - - def remote_run_epoch(self, ): - """ - @deprecated - """ - - # """ - # TODO: Implement poisioning by providing arguments to the different clients. - # Either the federator selects n nodes at the start, or a (configurable) function is selected, which - # determines to send to which nodes and which are poisoned - # """ - # responses.append((client, _remote_method_async(Client.run_epochs, client.ref, num_epoch=epochs))) + self._cluster_mgr = cluster_mgr + self.__arrival_generator = arv_gen + self._config = config - # TODO: Decide on how to combine logging in KubeFlow/Tensorboard/otherwise. - # res[0].tb_writer.add_scalar('training loss', - # epoch_data.loss_train, # for every 1000 minibatches - # self.epoch_counter * res[0].data_size) - # - # res[0].tb_writer.add_scalar('accuracy', - # epoch_data.accuracy, # for every 1000 minibatches - # self.epoch_counter * res[0].data_size) - # - # res[0].tb_writer.add_scalar('training loss per epoch', - # epoch_data.loss_train, # for every 1000 minibatches - # self.epoch_counter) - # - # res[0].tb_writer.add_scalar('accuracy per epoch', - # epoch_data.accuracy, # for every 1000 minibatches - # self.epoch_counter) - - def save_epoch_data(self): - file_output = f'./{self.config.output_location}' - # self.ensure_path_exists(file_output) - for key in self.client_data: - filename = f'{file_output}/{key}.csv' - self._logger.info(f'Saving data at {filename}') - with open(filename, "w") as f: - w = DataclassWriter(f, self.client_data[key], EpochData) - w.write() + # API to interact with the cluster. + self.__client = PyTorchJobClient() def stop(self) -> None: """ - + Stop the Orchestrator. @return: @rtype: """ @@ -91,27 +59,36 @@ def stop(self) -> None: def run(self) -> None: """ - Main loop of the Orchestrator + Main loop of the Orchestartor. :return: """ self._alive = True - while self._alive: + start_time = time.time() + while self._alive and time.time() - start_time < self._config.get_duration(): # 1. Check arrivals # If new arrivals, store them in arrival list - if not self.pending_tasks.empty(): - pass - - self._logger.info("Still alive...") + while not self.__arrival_generator.arrivals.empty(): + arrival: TrainTask = self.__arrival_generator.arrivals.get() + unique_identifier = uuid.uuid4() + task = ArrivalTask(id=unique_identifier, + network=arrival.network_configuration.network, + dataset=arrival.network_configuration.dataset, + sys_conf=arrival.system_parameters, + param_conf=arrival.hyper_parameters) + + self._logger.info(f"Arrival of: {task}") + self.pending_tasks.put(task) + while not self.pending_tasks.empty(): + # Do blocking request to priority queue + curr_task = self.pending_tasks.get() + self._logger.info(f"Scheduling arrival of Arrival: {curr_task}") + job_to_start = construct_job(self._config, curr_task) + + self.__client.create(job_to_start, namespace=self._config.cluster_config.namespace) + self.deployed_tasks.append(curr_task) + # TODO: Keep track of Jobs that were started, but may have completed.... + # That would conclude the MVP. + self._logger.debug("Still alive...") time.sleep(5) - # TODO: Implement run loop: - - # 2. If unscheduled tassk - # Take first / highest priority job - # Check for available resources in cluster, break if not - # Create Job description - # Spawn job - # 3. Check for job completion status - # 4. Record something? idk. - - logging.info(f'Federator is stopping') + logging.info(f'Experiment completed, currently does not support waiting.') diff --git a/fltk/schedulers/min_lr_step.py b/fltk/schedulers/min_lr_step.py index cecef6dd..ec795ab1 100644 --- a/fltk/schedulers/min_lr_step.py +++ b/fltk/schedulers/min_lr_step.py @@ -1,7 +1,15 @@ +import abc import logging -class MinCapableStepLR: +class LearningScheduler(abc.ABC): + + @abc.abstractmethod + def step(self): + raise NotImplementedError() + + +class MinCapableStepLR(LearningScheduler): def __init__(self, optimizer, step_size, gamma, min_lr): """ diff --git a/fltk/util/cluster/client.py b/fltk/util/cluster/client.py index 3154d3dc..1f980307 100644 --- a/fltk/util/cluster/client.py +++ b/fltk/util/cluster/client.py @@ -3,17 +3,20 @@ from collections import defaultdict from dataclasses import dataclass from multiprocessing.pool import ThreadPool -from typing import Dict +from typing import Dict, List +from uuid import UUID import schedule -from kubeflow.pytorchjob import V1PyTorchJob, V1ReplicaSpec +from kubeflow.pytorchjob import V1PyTorchJob, V1ReplicaSpec, V1PyTorchJobSpec from kubernetes import client, config -from kubernetes.client import BatchV1Api, V1Job, V1ObjectMeta, V1ResourceRequirements +from kubernetes.client import V1ObjectMeta, V1ResourceRequirements, V1Container, V1PodTemplateSpec, \ + V1VolumeMount, V1Toleration from torch.utils.tensorboard import SummaryWriter from fltk.util.cluster.conversion import Convert +from fltk.util.config import BareConfig from fltk.util.singleton import Singleton -from fltk.util.task.config.parameter import TrainTask +from fltk.util.task.task import ArrivalTask @dataclass @@ -40,10 +43,12 @@ class Resource: class BuildDescription: resources: V1ResourceRequirements - identifier: str - container: client.V1Container - template: client.V1PodTemplateSpec - spec: client.V1JobSpec + master_container: V1Container + worker_container: V1Container + master_template: V1PodTemplateSpec + worker_template: V1PodTemplateSpec + id: UUID + spec: V1PyTorchJobSpec class ResourceWatchDog: @@ -64,7 +69,7 @@ def __init__(self): Work should be based on the details listed here: https://github.com/scylladb/scylla-cluster-tests/blob/a7b09e69f0152a4d70bfb25ded3d75b7e7328acc/sdcm/cluster_k8s/__init__.py#L216-L223 """ - self._v1: client.CoreV1Api = None + self._v1: client.CoreV1Api self._logger = logging.getLogger('ResourceWatchDog') self._Q = Convert() @@ -102,7 +107,7 @@ def start(self) -> None: def __monitor_nodes(self) -> None: """ - Watchdog function that watches the Cluster resources in a K8s cluster. Requires the config to be set and loaded + Watchdog function that watches the Cluster resources in a K8s cluster. Requires the conf to be set and loaded prior to calling. @return: None @rtype: None @@ -129,8 +134,9 @@ def __monitor_pods(self) -> None: new_resource_mapper = {} self._logger.info("Fetching pod information of cluster...") - try: - for node_name, node in self._node_lookup.items(): + for node_name, node in self._node_lookup.items(): + try: + # Create field selector to only get active pods that 'request' memory selector = f'status.phase!=Succeeded,status.phase!=Failed,spec.nodeName={node_name}' # Select pods from all namespaces on specific Kubernetes node @@ -150,15 +156,20 @@ def __monitor_pods(self) -> None: mem_lim += self._Q(lmts["memory"]) resource = Resource(node_name, alloc_cpu, alloc_mem, core_req, mem_req, core_lim, mem_lim) new_resource_mapper[node_name] = resource - except Exception as e: - self._logger.error(f'Namespace lookup for {node_name} failed. Reason: {e}') + except Exception as e: + self._logger.error(f'Namespace lookup for {node_name} failed. Reason: {e}') self._resource_lookup = new_resource_mapper self._logger.debug(self._resource_lookup) class ClusterManager(metaclass=Singleton): - _alive = False + """ + Object to potentially further extend. This shows how the information of different Pods in a cluster can be + requested and parsed. Currently, it mainly exists to start the ResourceWatchDog, which now only logs the amount of + resources... + """ + __alive = False __threadpool: ThreadPool = None def __init__(self): @@ -168,13 +179,12 @@ def __init__(self): self._logger = logging.getLogger('ClusterManager') self._config = config.load_incluster_config() self._watchdog = ResourceWatchDog() - self._client_handler = ClientHandler() def start(self): self._logger.info("Spinning up cluster manager...") # Set debugging to WARNING only, as otherwise DEBUG statements will flood the logs. client.rest.logger.setLevel(logging.WARNING) - self._alive = True + self.__alive = True self.__thread_pool = ThreadPool(processes=2) self.__thread_pool.apply_async(self._watchdog.start) self.__thread_pool.apply_async(self._run) @@ -182,22 +192,17 @@ def start(self): def _stop(self): self._logger.info("Stopping execution of ClusterManager, halting components...") self._watchdog.stop() + self.__alive = False self.__thread_pool.join() self._logger.info("Successfully stopped execution of ClusterManager") def _run(self): - while self._alive: + while self.__alive: self._logger.info("Still alive...") time.sleep(10) self._stop() - def _schedulable_task(self, train_task: TrainTask): - current_resources = self._watchdog._resource_lookup - - def deploy_task(self): - train_task: TrainTask = None - class DeploymentBuilder: _buildDescription = BuildDescription() @@ -205,12 +210,8 @@ class DeploymentBuilder: def reset(self) -> None: self._buildDescription = BuildDescription() - def create_identifier(self, identifier) -> None: - # TODO: Move, or create identifier here. - self._buildDescription.identifier = identifier - @staticmethod - def __resource_dict(mem: str, cpu: str) -> Dict[str, str]: + def __resource_dict(mem: str, cpu: int) -> Dict[str, str]: """ Private helper function to create a resource dictionary for deployments. Currently only supports the creation of the requests/limits directory that is needed for a V1ResoruceRequirements object. @@ -222,47 +223,97 @@ def __resource_dict(mem: str, cpu: str) -> Dict[str, str]: @return: @rtype: """ - return {'memory': mem, 'cpu': cpu} + return {'memory': mem, 'cpu': str(cpu)} - def build_resources(self, mem_req, cpu_req, mem_lim, cpu_lim) -> None: - req_dict = self.__resource_dict(mem_req, cpu_req) - lim_dict = self.__resource_dict(mem_lim, cpu_lim) + def build_resources(self, arrival_task: ArrivalTask) -> None: + system_reqs = arrival_task.sys_conf + req_dict = self.__resource_dict(mem=system_reqs.executor_memory, + cpu=system_reqs.executor_cores) + # Currently the request is set to the limits. You may want to change this. self._buildDescription.resources = client.V1ResourceRequirements(requests=req_dict, - limits=lim_dict) - - def build_container(self, identifier: str = None) -> None: - self._buildDescription.container = client.V1Container( - name=f'client-test', - image='localhost:5000/fltk', - command=["python3", "fltk/launch.py", "single", - "configs/cloud_experiment.yaml"], - # TODO: Decide how to give client identifier. + limits=req_dict) + + def _generate_command(self, config: BareConfig, task: ArrivalTask): + command = (f'python3 -m client {config.config_path} {task.id} ' + f'--model {task.network} --dataset {task.dataset} ' + f'--optimizer Adam --max_epoch {task.param_conf.max_epoch} ' + f'--batch_size {task.param_conf.bs} --learning_rate {task.param_conf.lr} ' + f'--decay {task.param_conf.lr_decay} --loss CrossEntropy') + return command.split(' ') + + def _build_container(self, conf: BareConfig, task: ArrivalTask, name: str = "pytorch", + vol_mnts: List[V1VolumeMount] = None) -> V1Container: + return V1Container( + name=name, + image=conf.image, + command=self._generate_command(conf, task), args=['hello world'], image_pull_policy='Always', # Set the resources to the pre-generated resources - resources=self._buildDescription.resources + resources=self._buildDescription.resources, + volume_mounts=vol_mnts ) - def build_template(self, restart_policy='Never') -> None: - self._buildDescription.template = client.V1PodTemplateSpec( + def build_worker_container(self, conf: BareConfig, task: ArrivalTask, name: str = "pytorch") -> None: + self._buildDescription.worker_container = self._build_container(conf, task, name) + + def build_master_container(self, conf: BareConfig, task: ArrivalTask, name: str = "pytorch") -> None: + """ + Function to build the Master worker container. This requires the LOG PV to be mounted on the expected + logging directory. Make sure that any changes in the Helm charts are also reflected here. + @param image: + @type image: + @param name: + @type name: + @return: + @rtype: + """ + master_mounts: List[V1VolumeMount] = [V1VolumeMount( + mount_path=f'/opt/federation-lab/{conf.get_log_dir()}"', + name='fl-log-claim', + read_only=False + )] + self._buildDescription.master_container = self._build_container(conf, task, name, master_mounts) + + def build_container(self, task: ArrivalTask, conf: BareConfig): + self.build_master_container(conf, task) + self.build_worker_container(conf, task) + + def build_template(self) -> None: + """ + + @return: + @rtype: + """ + # TODO: Add support for tolerations to use only affinitity nodes to deploy to... + # Ensure with taints that + # https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ + + """ + V1Toleration() + """ + V1Toleration() + self._buildDescription.master_template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "fltk-worker"}), - spec=client.V1PodSpec(restart_policy=restart_policy, - containers=[self._buildDescription.container])) - - def build_spec(self, worker_num: int = 0) -> None: - # TODO: Ensure ReadWriteMany PVC is mounted to the Master node. - replica_spec = {"Master": V1ReplicaSpec( - replicas=1, - restart_policy="OnFailure", - template=self._buildDescription.template - )} - - if worker_num > 0: - replica_spec['Worker'] = V1ReplicaSpec( - replicas=worker_num, - restart_policy="OnFailure", - template=self._buildDescription.template + spec=client.V1PodSpec(containers=[self._buildDescription.master_container])) + self._buildDescription.worker_template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "fltk-worker"}), + spec=client.V1PodSpec(containers=[self._buildDescription.worker_container])) + + def build_spec(self, task: ArrivalTask, restart_policy: str = 'OnFailure') -> None: + pt_rep_spec: Dict[str, V1ReplicaSpec] = \ + {"Master": V1ReplicaSpec( + replicas=1, + restart_policy=restart_policy, + template=self._buildDescription.master_template + )} + if task.sys_conf.data_parallelism > 1: + pt_rep_spec['Worker'] = V1ReplicaSpec( + replicas=task.sys_conf.data_parallelism - 1, + restart_policy=restart_policy, + template=self._buildDescription.worker_template ) + self._buildDescription.spec = V1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec) def construct(self) -> V1PyTorchJob: """ @@ -275,30 +326,30 @@ def construct(self) -> V1PyTorchJob: job = V1PyTorchJob( api_version="kubeflow.org/v1", kind="PyTorchJob", - metadata=V1ObjectMeta(name=self._buildDescription.identifier, namespace='kubeflow'), + metadata=V1ObjectMeta(name=self._buildDescription.id, namespace='kubeflow'), spec=self._buildDescription.spec) return job + def create_identifier(self, task: ArrivalTask): + self._buildDescription.id = task.id -def construct_job(): - dp_builder = DeploymentBuilder() - dp_builder.create_identifier("client_example") - dp_builder.build_resources("1024Mi", '1000m', "1024Mi", "1000m") - dp_builder.build_container() +def construct_job(conf: BareConfig, task: ArrivalTask) -> V1PyTorchJob: + """ + Function to build a Job, based on the specifications of an ArrivalTask, and the general configuration of the + BareConfig. + @param conf: configuration object that contains specifics to properly start a client. + @type conf: BareConfig + @param task: Learning task for which a job description must be made. + @type task: ArrivalTask + @return: KubeFlow compatible PyTorchJob description to create a Job with the requested system and hyper parameters. + @rtype: V1PyTorchJob + """ + dp_builder = DeploymentBuilder() + dp_builder.create_identifier(task) + dp_builder.build_resources(task) + dp_builder.build_container(task, conf) dp_builder.build_template() - dp_builder.build_spec() + dp_builder.build_spec(task) job = dp_builder.construct() - - -def deploy_job(api_instance: BatchV1Api, job: V1Job): - api_instance.create_namespaced_job(body=job, namespace="test") - - -class ClientHandler(object): - def __init__(self): - self._v1 = client.CoreV1Api() - - def deploy_client(self, description): - # API to exec with - k8s_apps_v1 = client.AppsV1Api() + return job diff --git a/fltk/util/cluster/conversion.py b/fltk/util/cluster/conversion.py index 7cea557b..5de29836 100644 --- a/fltk/util/cluster/conversion.py +++ b/fltk/util/cluster/conversion.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Union from pint import UnitRegistry @@ -17,7 +18,7 @@ def __init__(self, path: Path = None): else: self.__Registry = UnitRegistry(filename=str(self.CONVERSION_PATH)) - def __call__(self, value: str) -> int: + def __call__(self, value: Union[str, int]) -> int: """ Function to convert str representation of a CPU/memory quantity into an integer representation. For conversion metrics see `/configs/quantities/kubernetes.conf` diff --git a/fltk/util/config/arguments.py b/fltk/util/config/arguments.py index 40f20a9a..1f5d0546 100644 --- a/fltk/util/config/arguments.py +++ b/fltk/util/config/arguments.py @@ -4,18 +4,18 @@ import torch.nn -from fltk.datasets import CIFAR10Dataset, FashionMNISTDataset +from fltk.datasets import CIFAR10Dataset, FashionMNISTDataset, CIFAR100Dataset from fltk.datasets.dataset import Dataset from fltk.nets import Cifar100ResNet, Cifar100VGG, Cifar10CNN, Cifar10ResNet, FashionMNISTCNN, FashionMNISTResNet CLIENT_ARGS: List[Tuple[str, str, str, type]] = \ [("model", "md", "Which model to train", str), ("dataset", "ds", "Which dataset to train the model on", str), - ("batch_size", "bs", + ("bs", "bs", "Number that are 'batched' together in a single forward/backward pass during the optimization steps.", int), ("max_epoch", "ep", "Maximum number of times that the 'training' set instances can be used during the optimization steps", int), - ("learning_rate", "lr", "Factor to limit the step size that is taken during each gradient descent step.", float), + ("lr", "lr", "Factor to limit the step size that is taken during each gradient descent step.", float), ("decay", 'dc', "Rate at which the learning rate decreases (i.e. the optimization takes smaller steps", float), ("loss", 'ls', "Loss function to use for optimization steps", str), @@ -45,7 +45,7 @@ class LearningParameters: _available_data = { "Cifar10": CIFAR10Dataset, - "Cifar100": CIFAR10Dataset, + "Cifar100": CIFAR100Dataset, "FashionMnist": FashionMNISTDataset } @@ -57,7 +57,6 @@ class LearningParameters: "Adam": torch.optim.SGD } - def get_model_class(self) -> Type[torch.nn.Module]: return self._available_nets.get(self.model) @@ -90,16 +89,15 @@ def extract_learning_parameters(args: Namespace) -> LearningParameters: return LearningParameters(model, dataset, batch_size, epoch, lr, decay, loss, optimizer) +def create_extractor_parser(subparsers): + extractor_parser = subparsers.add_parser('extractor') + extractor_parser.add_argument('conf', type=str) + + def create_client_parser(subparsers) -> None: client_parser = subparsers.add_parser('client') - client_parser.add_argument('config', type=str) + client_parser.add_argument('conf', type=str) client_parser.add_argument('task_id', type=str) - # Option to override rank, by default provided by PytorchJob in Kubeflow. - client_parser.add_argument('--rank', type=int, default=None) - # Option to override default nic, by default is 'eth0' in containers. - client_parser.add_argument('--nic', type=str, default=None) - # Option to override 'master' host name, by default provided by PytorchJob in Kubeflow. - client_parser.add_argument('--host', type=str, default=None) # Add hyper-parameters for long, short, hlp, tpe in CLIENT_ARGS: @@ -108,4 +106,4 @@ def create_client_parser(subparsers) -> None: def create_cluster_parser(subparsers) -> None: cluster_parser = subparsers.add_parser('cluster') - cluster_parser.add_argument('config', type=str) + cluster_parser.add_argument('conf', type=str) diff --git a/fltk/util/config/base_config.py b/fltk/util/config/base_config.py index e64cc38c..32b90f56 100644 --- a/fltk/util/config/base_config.py +++ b/fltk/util/config/base_config.py @@ -38,7 +38,7 @@ def prepare_log_dir(self, working_dir: Path = None): used, as the TensorBoard instance that is started simultaneously with the Orchestrator. @param working_dir: Current working directory, by default PWD is assumed at which the Python interpreter is started. - @type working_dir: pathlib.Path + @type working_dir: Path @return: None @rtype: None """ @@ -56,12 +56,14 @@ class ExecutionConfig: reproducibility: ReproducibilityConfig tensorboard: TensorboardConfig + duration: int experiment_prefix: str = "experiment" cuda: bool = False default_model_folder_path = "default_models" epoch_save_end_suffix = "epoch_end" save_model_path = "models" data_path = "data" + log_path = "log" @dataclass_json @@ -84,14 +86,26 @@ class ClusterConfig: orchestrator: OrchestratorConfig client: ClientConfig wait_for_clients: bool = True + namespace: str = 'test' @dataclass_json @dataclass class BareConfig(object): - # Configuration parameters for PyTorch and models that are generated. execution_config: ExecutionConfig cluster_config: ClusterConfig = field(metadata=config(field_name="cluster")) + config_path: Path = None + + def get_duration(self) -> int: + return self.execution_config.duration + + def get_log_dir(self): + return self.execution_config.log_path + + def get_log_path(self, experiment_id: str, client_id: int, network_name: str) -> Path: + base_log = Path(self.execution_config.tensorboard.record_dir) + experiment_dir = Path(f"{self.execution_config.experiment_prefix}_{client_id}_{network_name}_{experiment_id}") + return base_log.joinpath(experiment_dir) def get_scheduler_step_size(self) -> int: return self.execution_config.general_net.scheduler_step_size diff --git a/fltk/util/results.py b/fltk/util/results.py index 0af4ce5b..d0556f61 100644 --- a/fltk/util/results.py +++ b/fltk/util/results.py @@ -1,5 +1,6 @@ from dataclasses import dataclass -from typing import Any + +import numpy as np @dataclass @@ -9,8 +10,9 @@ class EpochData: loss_train: float accuracy: float loss: float - class_precision: Any - class_recall: Any + class_precision: np.array + class_recall: np.array + confusion_mat: np.array client_id: str = None def to_csv_line(self): diff --git a/fltk/util/task/config/parameter.py b/fltk/util/task/config/parameter.py index 23a12563..3bddae37 100644 --- a/fltk/util/task/config/parameter.py +++ b/fltk/util/task/config/parameter.py @@ -12,15 +12,15 @@ class HyperParameters: """ Learning HyperParameters. - batch_size: Number of images that are used during each forward/backward phase. + bs: Number of images that are used during each forward/backward phase. max_epoch: Number of times epochs are executed. - learning_rate: Learning rate parameter, limiting the step size in the gradient update. - learning_rate_decay: How fast the learning rate 'shrinks'. + lr: Learning rate parameter, limiting the step size in the gradient update. + lr_decay: How fast the learning rate 'shrinks'. """ - batch_size: int = field(metadata=config(field_name="batchSize")) + bs: int = field(metadata=config(field_name="batchSize")) max_epoch: int = field(metadata=config(field_name="maxEpoch")) - learning_rate: str = field(metadata=config(field_name="learningRate")) - learning_rate_decay: str = field(metadata=config(field_name="learningrateDecay")) + lr: str = field(metadata=config(field_name="learningRate")) + lr_decay: str = field(metadata=config(field_name="learningrateDecay")) @dataclass_json @@ -80,14 +80,10 @@ class JobDescription: Currently, the arrival statistics is the lambda value used in a Poisson arrival process. preemtible_jobs: indicates whether the jobs can be pre-emptively rescheduled by the scheduler. - # TODO: Decide whether we want to pull out some of the configurations out of this JSON parser,\ - # To prevent variable duplication in the experiment descriptino files. - runtime: indicates for how long jobs should be generated. """ job_class_parameters: List[JobClassParameter] = field(metadata=config(field_name="jobClassParameters")) arrival_statistic: float = field(metadata=config(field_name="lambda")) preemtible_jobs: float = field(metadata=config(field_name="preemptJobs")) - runtime: int @dataclass(order=True) @@ -103,8 +99,9 @@ class TrainTask: system_parameters: SystemParameters = field(compare=False) hyper_parameters: HyperParameters = field(compare=False) arrival_ticks: float = field(compare=False) + identifier: str = field(compare=False) - def __init__(self, job_parameters: JobClassParameter, priority: Priority, task_id: str): + def __init__(self, identity: str, job_parameters: JobClassParameter, priority: Priority): """ Overridden init method for dataclass, to allow for 'exploding' a JobDescription object to a flattened object. @param job_parameters: @@ -114,6 +111,7 @@ def __init__(self, job_parameters: JobClassParameter, priority: Priority, task_i @param priority: @type priority: """ + self.identifier = identity self.network_configuration = job_parameters.network_configuration self.system_parameters = job_parameters.system_parameters self.hyper_parameters = job_parameters.hyper_parameters @@ -127,7 +125,7 @@ def __init__(self, config_path: Path): def parse(self) -> List[JobDescription]: """ - Parse function to load JSON config into JobDescription objects. Any changes to the JSON file format + Parse function to load JSON conf into JobDescription objects. Any changes to the JSON file format should be reflected by the classes used. For more information refer to the dataclasses JSON documentation https://pypi.org/project/dataclasses-json/. """ diff --git a/fltk/util/task/generator/arrival_generator.py b/fltk/util/task/generator/arrival_generator.py index da9963d2..d8e547c6 100644 --- a/fltk/util/task/generator/arrival_generator.py +++ b/fltk/util/task/generator/arrival_generator.py @@ -1,9 +1,9 @@ import logging -import random from abc import abstractmethod from asyncio import sleep from dataclasses import dataclass from pathlib import Path +from queue import Queue from random import choices from time import time from typing import Dict, List @@ -21,6 +21,7 @@ class ArrivalGenerator(metaclass=Singleton): """ configuration_path: Path logger: logging.Logger = None + arrivals = Queue() @abstractmethod def load_config(self): @@ -53,7 +54,6 @@ class ExperimentGenerator(ArrivalGenerator): _tick_list: List[Arrival] = [] _alive: bool = False _decrement = 1 - __default_config: Path = Path('configs/example_cloud_experiment.json') def __init__(self, custom_config: Path = None): @@ -70,17 +70,6 @@ def set_logger(self, name: str = None): logging_name = name or self.__class__.__name__ self.logger = logging.getLogger(logging_name) - def set_seed(self, seed: int = 42): - """ - Function to pre-set the seed used by the Experiment generator, this allows for better reproducability of the - experiments. - @param seed: Seed to be used by the `random` library for experiment generation - @type seed: int - @return: - @rtype: - """ - random.seed(seed) - def load_config(self): """ Generate @@ -92,20 +81,20 @@ def load_config(self): def generate_arrival(self, task_id: str) -> None: """ Generate a training task for a JobDescription once the inter-arrival time has been 'deleted'. - @param train_id: identifier for a training task correspnoding to the JobDescription. + @param train_id: id for a training task correspnoding to the JobDescription. @type train_id: String """ - # TODO: logging + self.logger.info(f"Creating task for {task_id}") job = self.job_description[task_id] parameters = choices(job.job_class_parameters, [param.probability for param in job.job_class_parameters])[0] priority = choices(parameters.priorities, [prio.probabilities for prio in parameters.priorities], k=1)[0] inter_arrival_ticks = np.random.poisson(lam=job.arrival_statistic) - train_task = TrainTask(parameters, priority, task_id) + train_task = TrainTask(task_id, parameters, priority) self._tick_list.append(Arrival(inter_arrival_ticks, train_task, task_id)) - def start(self): + def start(self, duration: float): """ Function to start arrival generator, requires to @return: @@ -115,13 +104,13 @@ def start(self): self.set_logger() self.logger.info("Starting execution of arrival generator...") self._alive = True - self.run() + self.run(duration) def stop(self) -> None: self.logger.info("Received stopping signal") self._alive = False - def run(self, **kwargs): + def run(self, duration: float): """ Run function to generate arrivals during existence of the Orchestrator. WIP. @@ -130,18 +119,16 @@ def run(self, **kwargs): @rtype: """ self.start_time = time() - while self._alive: - arrived = [] + while self._alive and time() - self.start_time < duration: save_time = time() for indx, entry in enumerate(self._tick_list): entry.ticks -= self._decrement if entry.ticks <= 0: self._tick_list.pop(indx) - arrived.append(entry) + self.arrivals.put(entry) self.generate_arrival(entry.task_id) - # Correct for time drift between execution, otherwise drift adds up, and arrivals don't generate correctly correction_time = time() - save_time sleep(self._decrement - correction_time) self.stop_time = time() - self.logger.info(f"Stopped execution at: {self.stop_time}, duration: {self.stop_time - self.start_time}") + self.logger.info(f"Stopped execution at: {self.stop_time}, duration: {self.stop_time - self.start_time}/{duration}") diff --git a/fltk/util/task/task.py b/fltk/util/task/task.py index 47611c54..6bc1b567 100644 --- a/fltk/util/task/task.py +++ b/fltk/util/task/task.py @@ -1,11 +1,12 @@ import abc from dataclasses import dataclass +from uuid import UUID -from fltk.util.cluster.task.config.parameter import SystemParameters, HyperParameters +from fltk.util.task.config.parameter import SystemParameters, HyperParameters -@dataclass -class Task(abc): +@dataclass(order=True) +class ArrivalTask(abc): """ Object to contain configuration of training task. It describes the following properties; * Number of machines @@ -14,7 +15,9 @@ class Task(abc): * Dataset * Hyper-parameters """ + id: UUID + priority: int = field(st) network: str dataset: str - system_config: SystemParameters - parameter_config: HyperParameters + sys_conf: SystemParameters + param_conf: HyperParameters diff --git a/requirements.txt b/requirements.txt index 2c71f6c5..136406dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,47 +1,64 @@ absl-py==0.12.0 +aiohttp==3.7.4.post0 +async-timeout==3.0.1 +attrs==21.2.0 cachetools==4.2.2 certifi==2020.12.5 chardet==4.0.0 +colorful==0.5.4 dataclass-csv==1.3.0 dataclasses-json==0.5.4 +fsspec==2021.7.0 +future==0.18.2 google-auth==1.30.0 google-auth-oauthlib==0.4.4 grpcio==1.37.1 idna==2.10 iteration-utilities==0.11.0 joblib==1.0.1 +kubeflow-pytorchjob==0.1.3 kubernetes==17.17.0 Markdown==3.3.4 marshmallow==3.13.0 marshmallow-enum==1.5.1 memory-profiler==0.58.0 +multidict==5.1.0 mypy-extensions==0.4.3 numpy==1.20.2 oauthlib==3.1.0 packaging==21.0 +pandas==1.3.2 Pillow==8.2.0 Pint==0.17 +prettyprinter==0.18.0 protobuf==3.16.0 psutil==5.8.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 +pyDeprecate==0.3.1 +Pygments==2.10.0 pyparsing==2.4.7 python-dateutil==2.8.2 python-dotenv==0.17.1 +pytorch-lightning==1.4.4 +pytz==2021.1 PyYAML==5.4.1 requests==2.25.1 requests-oauthlib==1.3.0 +retrying==1.3.3 rsa==4.7.2 schedule==1.1.0 scikit-learn==0.23.2 scipy==1.6.3 six==1.16.0 stringcase==1.2.0 +table-logger==0.3.6 tensorboard==2.5.0 tensorboard-data-server==0.6.1 tensorboard-plugin-wit==1.8.0 threadpoolctl==2.1.0 torch==1.7.1 +torchmetrics==0.5.0 torchsummary==1.5.1 torchvision==0.8.2 tqdm==4.49.0 @@ -50,4 +67,4 @@ typing-inspect==0.7.1 urllib3==1.26.4 websocket-client==1.2.0 Werkzeug==1.0.1 -kubeflow-pytorchjob>=0.1.3 \ No newline at end of file +yarl==1.6.3