Skip to content

Commit

Permalink
Update repo to work with static configuration file for experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Apr 11, 2022
1 parent dd6918e commit 4418a4c
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 56 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ RUN --mount=type=cache,target=/root/.cache/pip python3 -m pip install -r require
# Add FLTK and configurations
ADD fltk fltk
ADD configs configs
ADD experiments experiments
ADD charts charts
6 changes: 3 additions & 3 deletions charts/orchestrator/templates/fl-server-pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ spec:
name: fl-server-claim
readOnly: true
- mountPath: /opt/federation-lab/config
name: fl-server-config
name: fltk-orchestrator-config
restartPolicy: Never
volumes:
- name: fl-server-claim
persistentVolumeClaim:
claimName: fl-server-claim
- name: fl-server-config
- name: fltk-orchestrator-config
configMap:
name: fltk-experiment-config
name: fltk-orchestrator-config
18 changes: 6 additions & 12 deletions configs/tasks/example_arrival_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@
"systemParameters": {
"dataParallelism": null,
"configurations": {
"master": {
"Master": {
"cores": "1000m",
"memory": "1Gi"
},
"slow": {
"Worker": {
"cores": "750m",
"memory": "1Gi"
},
"medium": {
"cores": "1500m",
"memory": "1.5Gi"
}
}
},
Expand All @@ -40,9 +36,8 @@
}
},
"configurations": {
"master": null,
"slow": null,
"medium": {
"Master": null,
"Worker": {
"learningRate": "0.05"
}
}
Expand Down Expand Up @@ -70,9 +65,8 @@
430
],
"workerReplication": {
"master": 1,
"slow": 2,
"medium": 1
"Master": 1,
"Worker": 2
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions experiments/node.config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
batch_size: 128
test_batch_size: 1000
rounds: 5
lr: 0.01
momentum: 0.1
cuda: False
shuffle: True
scheduler_step_size: 50
scheduler_gamma: 0.5
min_lr: 1e-10
rng_seed: 42
optimizer: SGD
optimizer_args: {'lr': 0.01, 'momentum': 0.1}
distributed: true
single_machine: false
aggregration: FedAvg
dataset_name: mnist
net_name: FashionMNISTCNN
data_sampler: uniform
data_sampler_args: [0.07, 42]
replication_id: 1
real_time: true
save_data_append: true
5 changes: 4 additions & 1 deletion fltk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@


def _save_get(args, param) -> Optional[Any]:
a = None
if args is not None and hasattr(args, param):
return args.__dict__[param]
a = args.__dict__[param]
print(f"gotten {param} resulted in {a}")
return None


Expand Down Expand Up @@ -58,6 +60,7 @@ def __main__():
print('No configuration path is provided.')

# TODO: move kwargs into function as extractor

__run_op_dict[args.action](arg_path, conf_path,
rank=_save_get(args, 'rank'),
parser=parser,
Expand Down
4 changes: 2 additions & 2 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ def __init__(self, id: str, rank: int, world_size: int, config: Config):
self.loss_function = self.config.get_loss_function()()
self.optimizer = get_optimizer(self.config.optimizer)(self.net.parameters(),
**self.config.optimizer_args)
self.scheduler = MinCapableStepLR(self.logger, self.optimizer,
self.scheduler = MinCapableStepLR(self.optimizer,
self.config.scheduler_step_size,
self.config.scheduler_gamma,
self.config.min_lr)

def remote_registration(self):
self.logger.info('Sending registration')
self.message('federator', 'ping', 'new_sender', be_weird=True)
self.message('federator', 'ping', 'new_sender')
self.message('federator', 'register_client', self.id, self.rank)
self.running = True
self._event_loop()
Expand Down
22 changes: 11 additions & 11 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def _prepare_experiment_maps(task: FederatedArrivalTask, uuid, replication: int
tpe_dict = collections.OrderedDict()
name_dict = collections.OrderedDict()
for tpe in task.type_map.keys():
name = f'{uuid}_{tpe}_{replication}'
name = str(f'{tpe}-{uuid}-{replication}').lower()
meta = V1ObjectMeta(name=name,
labels={'app.kubernetes.io/name': f"fltk.node.config.{tpe}"})
# TODO: Replication / seed information
filled_template = template.render(task=task, tpe=tpe, replication=replication, seed=42)
tpe_dict[tpe] = V1ConfigMap(data={'node.config.json': filled_template}, metadata=meta)
tpe_dict[tpe] = V1ConfigMap(data={'node.config.yaml': filled_template}, metadata=meta)
name_dict[tpe] = name
return tpe_dict, name_dict

Expand Down Expand Up @@ -68,8 +68,8 @@ def __init__(self, cluster_mgr: ClusterManager, arv_gen: ArrivalGenerator, confi
self._config = config

# API to interact with the cluster.
self.__client = PyTorchJobClient()
self.__v1 = client.CoreV1Api()
self._client = PyTorchJobClient()
self._v1 = client.CoreV1Api()

def stop(self) -> None:
"""
Expand Down Expand Up @@ -118,7 +118,7 @@ def run(self, clear: bool = True) -> None:

# Hack to overcome limitation of KubeFlow version (Made for older version of Kubernetes)
self.__logger.info(f"Deploying on cluster: {curr_task.id}")
self.__client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.append(curr_task)

# TODO: Extend this logic in your real project, this is only meant for demo purposes
Expand Down Expand Up @@ -185,7 +185,7 @@ def run_federated(self, clear: bool = True) -> None:
self.__create_config_maps(config_dict)
# Hack to overcome limitation of KubeFlow version (Made for older version of Kubernetes)
self.__logger.info(f"Deploying on cluster: {curr_task.id}")
self.__client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.append(curr_task)

# TODO: Extend this logic in your real project, this is only meant for demo purposes
Expand All @@ -208,11 +208,11 @@ def __clear_jobs(self):
namespace = self._config.cluster_config.namespace
self.__logger.info(f'Clearing old jobs in current namespace: {namespace}')

for job in self.__client.get(namespace=self._config.cluster_config.namespace)['items']:
for job in self._client.get(namespace=self._config.cluster_config.namespace)['items']:
job_name = job['metadata']['name']
self.__logger.info(f'Deleting: {job_name}')
try:
self.__client.custom_api.delete_namespaced_custom_object(
self._client.custom_api.delete_namespaced_custom_object(
PYTORCHJOB_GROUP,
PYTORCHJOB_VERSION,
namespace,
Expand All @@ -223,6 +223,6 @@ def __clear_jobs(self):
print(e)

def __create_config_maps(self, config_maps: Dict[str, V1ConfigMap]):
for _, config_map in config_maps.values():
self.__v1.create_namespaced_config_map(self._config.cluster_config.namespace,
config_map)
for _, config_map in config_maps.items():
self._v1.create_namespaced_config_map(self._config.cluster_config.namespace,
config_map)
2 changes: 1 addition & 1 deletion fltk/core/federator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create_clients(self):
if self.config.single_machine:
# Create direct clients
world_size = self.config.num_clients + 1
for client_id in range(1, self.config.num_clients+ 1):
for client_id in range(1, self.config.world_size):
client_name = f'client{client_id}'
client = Client(client_name, client_id, world_size, copy.deepcopy(self.config))
self.clients.append(LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path,
Expand Down
11 changes: 7 additions & 4 deletions fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _retrieve_or_init_env(nic=None, host=None):


def _retrieve_env_config():
rank, world_size, port = os.environ.get('RANK'), os.environ.get('WORLD_SIZE'), os.environ["MASTER_PORT"]
rank, world_size, port = int(os.environ.get('RANK')), int(os.environ.get('WORLD_SIZE')), int(os.environ["MASTER_PORT"])
return rank, world_size, port


Expand All @@ -194,12 +194,15 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non
config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
config.replication_id = prefix
if not (nic and host):
if rank and not (nic and host):
print("Getting parameters from configuration file")
nic, host = _retrieve_network_params_from_config(config, nic, host)
_retrieve_or_init_env(nic, host)
elif not (nic and host):
elif not rank:
print("Retrieving environmental configurations!")
rank, world_size, master_port = _retrieve_env_config()
assert world_size == config.world_size
print(f"Retrieved: rank {rank} w_s {world_size} m_p {master_port}")
config.world_size = world_size
else:
print('Missing rank, host, world-size, checking environment!')
parser.print_help()
Expand Down
2 changes: 1 addition & 1 deletion fltk/samplers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_sampler(dataset, args):
sampler = DirichletSampler(dataset, num_replicas=args.get_world_size(), rank=args.get_rank(),
args=args.get_sampler_args())
else: # default
logger.warning("Unknown sampler " + method + ", using uniform instead")
logger.warning(f"Unknown sampler {method}, using uniform instead")
sampler = UniformSampler(dataset, num_replicas=args.get_world_size(), rank=args.get_rank())

return sampler
38 changes: 20 additions & 18 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ def _generate_command(config: DistributedConfig, task: ArrivalTask, federated=Tr
f'--decay {task.param_conf.lr_decay} --loss CrossEntropy '
f'--backend gloo')
else:
command = (f'python3 -m fltk remote')
command = (f'python3 -m fltk remote experiments/node.config.yaml')
return command.split(' ')


def _build_typed_container(conf: DistributedConfig, cmd: List[str], resources: V1ResourceRequirements,
name: str = "pytorch", requires_mount: bool = False) -> V1Container:
name: str = "pytorch", requires_mount: bool = False, experiment_name: str = None) -> V1Container:
"""
Function to build the Master worker container. This requires the LOG PV to be mounted on the expected
logging directory. Make sure that any changes in the Helm charts are also reflected here.
Expand All @@ -230,11 +230,18 @@ def _build_typed_container(conf: DistributedConfig, cmd: List[str], resources: V
"""
mount_list: Optional[List[V1VolumeMount]] = []
if requires_mount:
mount_list: List[V1VolumeMount] = [V1VolumeMount(
mount_list.append(V1VolumeMount(
mount_path=f'/opt/federation-lab/{conf.get_log_dir()}',
name='fl-log-claim',
read_only=False
)]
))
# TODO: Mount volume

# mount_list.append(V1VolumeMount(
# mount_path=f'/opt/federation-lab/experiments',
# name='experiment',
# read_only=True
# ))
# Create mount for configuration
container = V1Container(name=name,
image=conf.cluster_config.image,
Expand Down Expand Up @@ -277,7 +284,7 @@ def build_resources(self, arrival_task: ArrivalTask) -> None:
self._buildDescription.resources[tpe] = client.V1ResourceRequirements(requests=typed_req_dict,
limits=typed_req_dict)

def build_container(self, task: ArrivalTask, conf: DistributedConfig):
def build_container(self, task: ArrivalTask, conf: DistributedConfig, config_name_dict: Optional[Dict[str, str]]):
"""
Function to build container descriptions for deploying from within an Orchestrator pod.
@param task:
Expand All @@ -291,7 +298,8 @@ def build_container(self, task: ArrivalTask, conf: DistributedConfig):
cmd = _generate_command(conf, task)
for indx, (tpe, curr_resource) in enumerate(self._buildDescription.resources.items()):
self._buildDescription.typed_containers[tpe] = _build_typed_container(conf, cmd, curr_resource,
requires_mount=not indx)
requires_mount=not indx,
experiment_name=config_name_dict[tpe])

def build_tolerations(self, tols: List[Tuple[str, Optional[str], str, str]] = None):
if not tols:
Expand All @@ -313,22 +321,16 @@ def build_template(self, config_name_dict: Optional[Dict[str, str]]) -> None:
# Ensure with taints that
# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/

master_volumes = \
volumes = \
[V1Volume(name="fl-log-claim",
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='fl-log-claim'))
]
if config_name_dict:
for tpe, tpe_config_map_name in config_name_dict.items():
V1Volume(name=tpe_config_map_name,
config_map=V1ConfigMapVolumeSource(tpe_config_map_name,
items=[V1KeyToPath(
key='experiment.yaml',
path='configs/experiment.yaml')]
))
# if config_name_dict:
# for tpe, tpe_config_map_name in config_name_dict.items():
# volumes.append(V1Volume(name='experiment',
# config_map=V1ConfigMapVolumeSource(tpe_config_map_name)))
for tpe, container in self._buildDescription.typed_containers.items():
# TODO: Make this less hardcody
volumes = master_volumes if 'Master' in tpe else None

self._buildDescription.typed_templates[tpe] = \
client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "fltk-worker"}),
Expand Down Expand Up @@ -384,7 +386,7 @@ def construct_job(conf: DistributedConfig, task: DistributedArrivalTask,
dp_builder = DeploymentBuilder()
dp_builder.create_identifier(task)
dp_builder.build_resources(task)
dp_builder.build_container(task, conf)
dp_builder.build_container(task, conf, config_name_dict)
dp_builder.build_tolerations()
dp_builder.build_template(config_name_dict)
dp_builder.build_spec(task)
Expand Down
2 changes: 1 addition & 1 deletion fltk/util/config/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def create_remote_parser(subparsers) -> None:
remote_parser = subparsers.add_parser('remote')
add_default_arguments(remote_parser)

remote_parser.add_argument('rank', type=int, default=None)
remote_parser.add_argument('rank', nargs='?', type=int, default=None)
remote_parser.add_argument('--nic', type=str, default=None)
remote_parser.add_argument('--host', type=str, default=None)

Expand Down
4 changes: 2 additions & 2 deletions fltk/util/task/config/parameter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, OrderedDict, Any, Union, Tuple, Type
from typing import List, Optional, OrderedDict, Any, Union, Tuple, Type, Dict

import torch
from dataclasses_json import dataclass_json, LetterCase, config
Expand Down Expand Up @@ -39,7 +39,7 @@ class HyperParameterConfiguration:
test_bs: Optional[int] = field(metadata=config(field_name="testBatchSize"), default_factory=_none_factory)
lr_decay: Optional[float] = field(metadata=config(field_name="learningRateDecay"), default_factory=_none_factory)

def merge_default(self, other: dict[str, Any]):
def merge_default(self, other: Dict[str, Any]):
"""
Function to merge a HyperParameterConfiguration object with a default configuration
@param other:
Expand Down

0 comments on commit 4418a4c

Please sign in to comment.