Skip to content

Commit

Permalink
Make federated learnign default
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Mar 31, 2022
1 parent 2a976d4 commit c735aad
Showing 1 changed file with 77 additions and 58 deletions.
135 changes: 77 additions & 58 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
from dataclasses import dataclass
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Tuple, Optional, OrderedDict
from typing import Dict, List, Tuple, Optional, OrderedDict, Union
from uuid import UUID

import schedule
Expand Down Expand Up @@ -191,73 +191,92 @@ def _run(self):
self._stop()


def _generate_command(config: DistributedConfig, task: ArrivalTask, federated=True) -> List[str]:
"""
Function to generate commands for containers to start working with. Either a federated learnign command
will be realized, or a distributed learning command. Note that distributed learning commands will be revised
in an upcomming version of KFLTK.
@param config:
@type config:
@param task:
@type task:
@param federated:
@type federated:
@return:
@rtype:
"""
if not federated:
command = (f'python3 -m fltk client {config.config_path} {task.id} '
f'--model {task.network} --dataset {task.dataset} '
f'--optimizer Adam --max_epoch {task.param_conf.max_epoch} '
f'--batch_size {task.param_conf.bs} --learning_rate {task.param_conf.lr} '
f'--decay {task.param_conf.lr_decay} --loss CrossEntropy '
f'--backend gloo')
else:
command = (f'python3 -m fltk remote')
return command.split(' ')


def _build_typed_container(conf: DistributedConfig, cmd: str, resources: V1ResourceRequirements,
name: str = "pytorch", requires_mount: bool = False) -> V1Container:
"""
Function to build the Master worker container. This requires the LOG PV to be mounted on the expected
logging directory. Make sure that any changes in the Helm charts are also reflected here.
@param name:
@type name:
@return:
@rtype:
"""
mount_list: Optional[List[V1VolumeMount]] = None
if requires_mount:
mount_list: List[V1VolumeMount] = [V1VolumeMount(
mount_path=f'/opt/federation-lab/{conf.get_log_dir()}',
name='fl-log-claim',
read_only=False
)]

container = V1Container(name=name,
image=conf.cluster_config.image,
command=cmd,
image_pull_policy='Always',
# Set the resources to the pre-generated resources
resources=resources,
volume_mounts=mount_list)
return container


def _resource_dict(mem: Union[str, int], cpu: Union[str, int]) -> Dict[str, str]:
"""
Private helper function to create a resource dictionary for deployments. Currently only supports the creation
of the requests/limits directory that is needed for a V1ResoruceRequirements object.
@param mem: Memory Request/Limit for a Container's ResoruceRequirement
@type mem: str
@param cpu: CPU Request/Limit for a Container's ResourceRequirement.
@type cpu: int
@return:
@rtype:
"""
return {'memory': f'{mem}', 'cpu': f'{cpu}'}


class DeploymentBuilder:
_buildDescription = BuildDescription()

def reset(self) -> None:
del self._buildDescription
self._buildDescription = BuildDescription()

@staticmethod
def __resource_dict(mem: str, cpu: int) -> Dict[str, str]:
"""
Private helper function to create a resource dictionary for deployments. Currently only supports the creation
of the requests/limits directory that is needed for a V1ResoruceRequirements object.
@param mem: Memory Request/Limit for a Container's ResoruceRequirement
@type mem:
@param cpu: CPU Request/Limit for a Container's ResoruceRequirement
@type cpu:
@return:
@rtype:
"""
return {'memory': mem, 'cpu': str(cpu)}

def build_resources(self, arrival_task: ArrivalTask) -> None:
system_reqs = arrival_task.named_system_params()
for tpe, sys_reqs in system_reqs.items():
typed_req_dict = self.__resource_dict(mem=sys_reqs.executor_memory,
cpu=sys_reqs.executor_cores)
typed_req_dict = _resource_dict(mem=sys_reqs.executor_memory,
cpu=sys_reqs.executor_cores)
# Currently the request is set to the limits. You may want to change this.
self._buildDescription.resources[tpe] = client.V1ResourceRequirements(requests=typed_req_dict,
limits=typed_req_dict)

def _generate_command(self, config: DistributedConfig, task: ArrivalTask):
command = (f'python3 -m fltk client {config.config_path} {task.id} '
f'--model {task.network} --dataset {task.dataset} '
f'--optimizer Adam --max_epoch {task.param_conf.max_epoch} '
f'--batch_size {task.param_conf.bs} --learning_rate {task.param_conf.lr} '
f'--decay {task.param_conf.lr_decay} --loss CrossEntropy '
f'--backend gloo')
return command.split(' ')

def _build_typed_container(self, conf: DistributedConfig, cmd: str, resources: V1ResourceRequirements,
name: str = "pytorch", requires_mount: bool = False) -> V1Container:
"""
Function to build the Master worker container. This requires the LOG PV to be mounted on the expected
logging directory. Make sure that any changes in the Helm charts are also reflected here.
@param name:
@type name:
@return:
@rtype:
"""
mount_list: Optional[List[V1VolumeMount]] = None
if requires_mount:
mount_list: List[V1VolumeMount] = [V1VolumeMount(
mount_path=f'/opt/federation-lab/{conf.get_log_dir()}',
name='fl-log-claim',
read_only=False
)]

container = V1Container(name=name,
image=conf.cluster_config.image,
command=cmd,
image_pull_policy='Always',
# Set the resources to the pre-generated resources
resources=resources,
volume_mounts=mount_list)
return container

def build_container(self, task: DistributedArrivalTask, conf: DistributedConfig):
def build_container(self, task: ArrivalTask, conf: DistributedConfig):
"""
Function to build container descriptions for deploying from within an Orchestrator pod.
@param task:
Expand All @@ -268,12 +287,12 @@ def build_container(self, task: DistributedArrivalTask, conf: DistributedConfig)
@rtype:
"""
# TODO: Implement cmd / config reference.
cmd = None
cmd = _generate_command(conf, task)
tpe, curr_resource = self._buildDescription.resources.items()[0]
self._buildDescription.typed_containers[tpe] = self._build_typed_container(conf, cmd, curr_resource,
self._buildDescription.typed_containers[tpe] = _build_typed_container(conf, cmd, curr_resource,
requires_mount=True)
for tpe, curr_resource in self._buildDescription.resources.items()[1:]:
self._buildDescription.typed_containers[tpe] = self._build_typed_container(conf, cmd, curr_resource)
self._buildDescription.typed_containers[tpe] = _build_typed_container(conf, cmd, curr_resource)

def build_tolerations(self, tols: List[Tuple[str, Optional[str], str, str]] = None):
if not tols:
Expand Down

0 comments on commit c735aad

Please sign in to comment.