diff --git a/Dockerfile b/Dockerfile index bee5f1be..b678aade 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,4 +25,5 @@ RUN --mount=type=cache,target=/root/.cache/pip python3 -m pip install -r require # Add FLTK and configurations ADD fltk fltk ADD configs configs +ADD experiments experiments ADD charts charts diff --git a/charts/orchestrator/templates/fl-server-pod.yaml b/charts/orchestrator/templates/fl-server-pod.yaml index 7b3186e1..b0569d18 100644 --- a/charts/orchestrator/templates/fl-server-pod.yaml +++ b/charts/orchestrator/templates/fl-server-pod.yaml @@ -32,12 +32,12 @@ spec: name: fl-server-claim readOnly: true - mountPath: /opt/federation-lab/config - name: fl-server-config + name: fltk-orchestrator-config restartPolicy: Never volumes: - name: fl-server-claim persistentVolumeClaim: claimName: fl-server-claim - - name: fl-server-config + - name: fltk-orchestrator-config configMap: - name: fltk-experiment-config + name: fltk-orchestrator-config diff --git a/configs/tasks/example_arrival_config.json b/configs/tasks/example_arrival_config.json index 745f4dca..5a2e1340 100644 --- a/configs/tasks/example_arrival_config.json +++ b/configs/tasks/example_arrival_config.json @@ -9,17 +9,13 @@ "systemParameters": { "dataParallelism": null, "configurations": { - "master": { + "Master": { "cores": "1000m", "memory": "1Gi" }, - "slow": { + "Worker": { "cores": "750m", "memory": "1Gi" - }, - "medium": { - "cores": "1500m", - "memory": "1.5Gi" } } }, @@ -40,9 +36,8 @@ } }, "configurations": { - "master": null, - "slow": null, - "medium": { + "Master": null, + "Worker": { "learningRate": "0.05" } } @@ -70,9 +65,8 @@ 430 ], "workerReplication": { - "master": 1, - "slow": 2, - "medium": 1 + "Master": 1, + "Worker": 2 } } } diff --git a/experiments/node.config.yaml b/experiments/node.config.yaml new file mode 100644 index 00000000..d6fbbf3a --- /dev/null +++ b/experiments/node.config.yaml @@ -0,0 +1,23 @@ +batch_size: 128 +test_batch_size: 1000 +rounds: 5 +lr: 0.01 +momentum: 0.1 +cuda: False +shuffle: True +scheduler_step_size: 50 +scheduler_gamma: 0.5 +min_lr: 1e-10 +rng_seed: 42 +optimizer: SGD +optimizer_args: {'lr': 0.01, 'momentum': 0.1} +distributed: true +single_machine: false +aggregration: FedAvg +dataset_name: mnist +net_name: FashionMNISTCNN +data_sampler: uniform +data_sampler_args: [0.07, 42] +replication_id: 1 +real_time: true +save_data_append: true \ No newline at end of file diff --git a/fltk/__main__.py b/fltk/__main__.py index cccb8d20..57418eb2 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -24,8 +24,10 @@ def _save_get(args, param) -> Optional[Any]: + a = None if args is not None and hasattr(args, param): - return args.__dict__[param] + a = args.__dict__[param] + print(f"gotten {param} resulted in {a}") return None @@ -58,6 +60,7 @@ def __main__(): print('No configuration path is provided.') # TODO: move kwargs into function as extractor + __run_op_dict[args.action](arg_path, conf_path, rank=_save_get(args, 'rank'), parser=parser, diff --git a/fltk/core/client.py b/fltk/core/client.py index 8141411d..5f3d8787 100644 --- a/fltk/core/client.py +++ b/fltk/core/client.py @@ -18,14 +18,14 @@ def __init__(self, id: str, rank: int, world_size: int, config: Config): self.loss_function = self.config.get_loss_function()() self.optimizer = get_optimizer(self.config.optimizer)(self.net.parameters(), **self.config.optimizer_args) - self.scheduler = MinCapableStepLR(self.logger, self.optimizer, + self.scheduler = MinCapableStepLR(self.optimizer, self.config.scheduler_step_size, self.config.scheduler_gamma, self.config.min_lr) def remote_registration(self): self.logger.info('Sending registration') - self.message('federator', 'ping', 'new_sender', be_weird=True) + self.message('federator', 'ping', 'new_sender') self.message('federator', 'register_client', self.id, self.rank) self.running = True self._event_loop() diff --git a/fltk/core/distributed/orchestrator.py b/fltk/core/distributed/orchestrator.py index 3afa9a9f..58cb3c2e 100644 --- a/fltk/core/distributed/orchestrator.py +++ b/fltk/core/distributed/orchestrator.py @@ -28,12 +28,12 @@ def _prepare_experiment_maps(task: FederatedArrivalTask, uuid, replication: int tpe_dict = collections.OrderedDict() name_dict = collections.OrderedDict() for tpe in task.type_map.keys(): - name = f'{uuid}_{tpe}_{replication}' + name = str(f'{tpe}-{uuid}-{replication}').lower() meta = V1ObjectMeta(name=name, labels={'app.kubernetes.io/name': f"fltk.node.config.{tpe}"}) # TODO: Replication / seed information filled_template = template.render(task=task, tpe=tpe, replication=replication, seed=42) - tpe_dict[tpe] = V1ConfigMap(data={'node.config.json': filled_template}, metadata=meta) + tpe_dict[tpe] = V1ConfigMap(data={'node.config.yaml': filled_template}, metadata=meta) name_dict[tpe] = name return tpe_dict, name_dict @@ -68,8 +68,8 @@ def __init__(self, cluster_mgr: ClusterManager, arv_gen: ArrivalGenerator, confi self._config = config # API to interact with the cluster. - self.__client = PyTorchJobClient() - self.__v1 = client.CoreV1Api() + self._client = PyTorchJobClient() + self._v1 = client.CoreV1Api() def stop(self) -> None: """ @@ -118,7 +118,7 @@ def run(self, clear: bool = True) -> None: # Hack to overcome limitation of KubeFlow version (Made for older version of Kubernetes) self.__logger.info(f"Deploying on cluster: {curr_task.id}") - self.__client.create(job_to_start, namespace=self._config.cluster_config.namespace) + self._client.create(job_to_start, namespace=self._config.cluster_config.namespace) self.deployed_tasks.append(curr_task) # TODO: Extend this logic in your real project, this is only meant for demo purposes @@ -185,7 +185,7 @@ def run_federated(self, clear: bool = True) -> None: self.__create_config_maps(config_dict) # Hack to overcome limitation of KubeFlow version (Made for older version of Kubernetes) self.__logger.info(f"Deploying on cluster: {curr_task.id}") - self.__client.create(job_to_start, namespace=self._config.cluster_config.namespace) + self._client.create(job_to_start, namespace=self._config.cluster_config.namespace) self.deployed_tasks.append(curr_task) # TODO: Extend this logic in your real project, this is only meant for demo purposes @@ -208,11 +208,11 @@ def __clear_jobs(self): namespace = self._config.cluster_config.namespace self.__logger.info(f'Clearing old jobs in current namespace: {namespace}') - for job in self.__client.get(namespace=self._config.cluster_config.namespace)['items']: + for job in self._client.get(namespace=self._config.cluster_config.namespace)['items']: job_name = job['metadata']['name'] self.__logger.info(f'Deleting: {job_name}') try: - self.__client.custom_api.delete_namespaced_custom_object( + self._client.custom_api.delete_namespaced_custom_object( PYTORCHJOB_GROUP, PYTORCHJOB_VERSION, namespace, @@ -223,6 +223,6 @@ def __clear_jobs(self): print(e) def __create_config_maps(self, config_maps: Dict[str, V1ConfigMap]): - for _, config_map in config_maps.values(): - self.__v1.create_namespaced_config_map(self._config.cluster_config.namespace, - config_map) \ No newline at end of file + for _, config_map in config_maps.items(): + self._v1.create_namespaced_config_map(self._config.cluster_config.namespace, + config_map) \ No newline at end of file diff --git a/fltk/core/federator.py b/fltk/core/federator.py index 7faa8578..7f1aaf00 100644 --- a/fltk/core/federator.py +++ b/fltk/core/federator.py @@ -54,7 +54,7 @@ def create_clients(self): if self.config.single_machine: # Create direct clients world_size = self.config.num_clients + 1 - for client_id in range(1, self.config.num_clients+ 1): + for client_id in range(1, self.config.world_size): client_name = f'client{client_id}' client = Client(client_name, client_id, world_size, copy.deepcopy(self.config)) self.clients.append(LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path, diff --git a/fltk/launch.py b/fltk/launch.py index 425fe13c..c80fdb57 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -172,7 +172,7 @@ def _retrieve_or_init_env(nic=None, host=None): def _retrieve_env_config(): - rank, world_size, port = os.environ.get('RANK'), os.environ.get('WORLD_SIZE'), os.environ["MASTER_PORT"] + rank, world_size, port = int(os.environ.get('RANK')), int(os.environ.get('WORLD_SIZE')), int(os.environ["MASTER_PORT"]) return rank, world_size, port @@ -194,12 +194,15 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non config = Config.FromYamlFile(config_path) config.world_size = config.num_clients + 1 config.replication_id = prefix - if not (nic and host): + if rank and not (nic and host): + print("Getting parameters from configuration file") nic, host = _retrieve_network_params_from_config(config, nic, host) _retrieve_or_init_env(nic, host) - elif not (nic and host): + elif not rank: + print("Retrieving environmental configurations!") rank, world_size, master_port = _retrieve_env_config() - assert world_size == config.world_size + print(f"Retrieved: rank {rank} w_s {world_size} m_p {master_port}") + config.world_size = world_size else: print('Missing rank, host, world-size, checking environment!') parser.print_help() diff --git a/fltk/samplers/__init__.py b/fltk/samplers/__init__.py index d808d4ff..e66bd864 100644 --- a/fltk/samplers/__init__.py +++ b/fltk/samplers/__init__.py @@ -35,7 +35,7 @@ def get_sampler(dataset, args): sampler = DirichletSampler(dataset, num_replicas=args.get_world_size(), rank=args.get_rank(), args=args.get_sampler_args()) else: # default - logger.warning("Unknown sampler " + method + ", using uniform instead") + logger.warning(f"Unknown sampler {method}, using uniform instead") sampler = UniformSampler(dataset, num_replicas=args.get_world_size(), rank=args.get_rank()) return sampler diff --git a/fltk/util/cluster/client.py b/fltk/util/cluster/client.py index 452dd140..3a4c4bf8 100644 --- a/fltk/util/cluster/client.py +++ b/fltk/util/cluster/client.py @@ -214,12 +214,12 @@ def _generate_command(config: DistributedConfig, task: ArrivalTask, federated=Tr f'--decay {task.param_conf.lr_decay} --loss CrossEntropy ' f'--backend gloo') else: - command = (f'python3 -m fltk remote') + command = (f'python3 -m fltk remote experiments/node.config.yaml') return command.split(' ') def _build_typed_container(conf: DistributedConfig, cmd: List[str], resources: V1ResourceRequirements, - name: str = "pytorch", requires_mount: bool = False) -> V1Container: + name: str = "pytorch", requires_mount: bool = False, experiment_name: str = None) -> V1Container: """ Function to build the Master worker container. This requires the LOG PV to be mounted on the expected logging directory. Make sure that any changes in the Helm charts are also reflected here. @@ -230,11 +230,18 @@ def _build_typed_container(conf: DistributedConfig, cmd: List[str], resources: V """ mount_list: Optional[List[V1VolumeMount]] = [] if requires_mount: - mount_list: List[V1VolumeMount] = [V1VolumeMount( + mount_list.append(V1VolumeMount( mount_path=f'/opt/federation-lab/{conf.get_log_dir()}', name='fl-log-claim', read_only=False - )] + )) + # TODO: Mount volume + + # mount_list.append(V1VolumeMount( + # mount_path=f'/opt/federation-lab/experiments', + # name='experiment', + # read_only=True + # )) # Create mount for configuration container = V1Container(name=name, image=conf.cluster_config.image, @@ -277,7 +284,7 @@ def build_resources(self, arrival_task: ArrivalTask) -> None: self._buildDescription.resources[tpe] = client.V1ResourceRequirements(requests=typed_req_dict, limits=typed_req_dict) - def build_container(self, task: ArrivalTask, conf: DistributedConfig): + def build_container(self, task: ArrivalTask, conf: DistributedConfig, config_name_dict: Optional[Dict[str, str]]): """ Function to build container descriptions for deploying from within an Orchestrator pod. @param task: @@ -291,7 +298,8 @@ def build_container(self, task: ArrivalTask, conf: DistributedConfig): cmd = _generate_command(conf, task) for indx, (tpe, curr_resource) in enumerate(self._buildDescription.resources.items()): self._buildDescription.typed_containers[tpe] = _build_typed_container(conf, cmd, curr_resource, - requires_mount=not indx) + requires_mount=not indx, + experiment_name=config_name_dict[tpe]) def build_tolerations(self, tols: List[Tuple[str, Optional[str], str, str]] = None): if not tols: @@ -313,22 +321,16 @@ def build_template(self, config_name_dict: Optional[Dict[str, str]]) -> None: # Ensure with taints that # https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ - master_volumes = \ + volumes = \ [V1Volume(name="fl-log-claim", persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='fl-log-claim')) ] - if config_name_dict: - for tpe, tpe_config_map_name in config_name_dict.items(): - V1Volume(name=tpe_config_map_name, - config_map=V1ConfigMapVolumeSource(tpe_config_map_name, - items=[V1KeyToPath( - key='experiment.yaml', - path='configs/experiment.yaml')] - )) + # if config_name_dict: + # for tpe, tpe_config_map_name in config_name_dict.items(): + # volumes.append(V1Volume(name='experiment', + # config_map=V1ConfigMapVolumeSource(tpe_config_map_name))) for tpe, container in self._buildDescription.typed_containers.items(): # TODO: Make this less hardcody - volumes = master_volumes if 'Master' in tpe else None - self._buildDescription.typed_templates[tpe] = \ client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "fltk-worker"}), @@ -384,7 +386,7 @@ def construct_job(conf: DistributedConfig, task: DistributedArrivalTask, dp_builder = DeploymentBuilder() dp_builder.create_identifier(task) dp_builder.build_resources(task) - dp_builder.build_container(task, conf) + dp_builder.build_container(task, conf, config_name_dict) dp_builder.build_tolerations() dp_builder.build_template(config_name_dict) dp_builder.build_spec(task) diff --git a/fltk/util/config/arguments.py b/fltk/util/config/arguments.py index 8c66c946..70b68ea3 100644 --- a/fltk/util/config/arguments.py +++ b/fltk/util/config/arguments.py @@ -184,7 +184,7 @@ def create_remote_parser(subparsers) -> None: remote_parser = subparsers.add_parser('remote') add_default_arguments(remote_parser) - remote_parser.add_argument('rank', type=int, default=None) + remote_parser.add_argument('rank', nargs='?', type=int, default=None) remote_parser.add_argument('--nic', type=str, default=None) remote_parser.add_argument('--host', type=str, default=None) diff --git a/fltk/util/task/config/parameter.py b/fltk/util/task/config/parameter.py index 5a6d88fd..d5f65a10 100644 --- a/fltk/util/task/config/parameter.py +++ b/fltk/util/task/config/parameter.py @@ -1,7 +1,7 @@ import json from dataclasses import dataclass, field from pathlib import Path -from typing import List, Optional, OrderedDict, Any, Union, Tuple, Type +from typing import List, Optional, OrderedDict, Any, Union, Tuple, Type, Dict import torch from dataclasses_json import dataclass_json, LetterCase, config @@ -39,7 +39,7 @@ class HyperParameterConfiguration: test_bs: Optional[int] = field(metadata=config(field_name="testBatchSize"), default_factory=_none_factory) lr_decay: Optional[float] = field(metadata=config(field_name="learningRateDecay"), default_factory=_none_factory) - def merge_default(self, other: dict[str, Any]): + def merge_default(self, other: Dict[str, Any]): """ Function to merge a HyperParameterConfiguration object with a default configuration @param other: