Skip to content

Commit

Permalink
Update with debugging results
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 26, 2022
1 parent c7878d0 commit 824fae1
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 53 deletions.
14 changes: 7 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM bitnami/pytorch:1.12.1

MAINTAINER Jeroen Galjaard <[email protected]>

Expand All @@ -11,14 +11,14 @@ ARG DEBIAN_FRONTEND=noninteractive
# Define the working directory of the current Docker container
WORKDIR /opt/federation-lab

# Update the Ubuntu software repository and fetch packages
RUN apt-get update \
&& apt-get install -y python3.9
## Update the Ubuntu software repository and fetch packages
#RUN apt-get update \
# && apt-get install -y python3.9

# Setup pip3.9 for dependencies
RUN apt install -y curl python3.9-distutils
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
RUN python3 get-pip.py
#RUN apt install -y curl python3.9-distutils
#RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
#RUN python3 get-pip.py

# Add Pre-downloaded models (otherwise needs be run every-time)
ADD data/ data/
Expand Down
9 changes: 7 additions & 2 deletions charts/orchestrator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
name: ../docker-compose-gcloud
description: Helm Chart for running the Federator/Orchestrator for the FLTK framework
version: 0.2.0
description: Helm Chart for running the Orchestrator for the FLTK framework. Requires
version: 0.3.0
apiVersion: v1
appVersion: 1.17.0
keywords:
- Orchestrator
- FLTK
sources:
home:
#dependencies:
#- name: fltk-extractor
# version: "0.1.0"
# repository: "file://../extractor"
# reason:
4 changes: 2 additions & 2 deletions experiments/test/federated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cuda: false
scheduler_step_size: 500
scheduler_gamma: 0.2
rounds: 1
epochs: 2500
epochs: 0
lr: 0.05
momentum: 0.011
shuffle: True
Expand All @@ -18,7 +18,7 @@ loss_function: CrossEntropyLoss
clients_per_round: 1
distributed: true
single_machine: false
aggregation: Sum
aggregation: FedAvg
dataset_name: mnist
net_name: FashionMNISTCNN
data_sampler: uniform
Expand Down
16 changes: 9 additions & 7 deletions fltk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import sys


from fltk.launch import launch_extractor, launch_client, launch_single, launch_remote, launch_cluster, launch_signature
from fltk.util.config import get_distributed_config
from fltk.util.config.arguments import create_all_subparsers
Expand Down Expand Up @@ -68,14 +69,15 @@ def main():
print('No configuration path is provided.')

launch_fn: launch_signature = __run_op_dict[args.action]
launch_fn(arg_path, conf_path,
_save_get(args, 'rank'),
_save_get(args, 'nic'),
_save_get(args, 'host'),
_save_get(args, 'prefix'),
args,
distributed_config)
try:
launch_fn(arg_path, conf_path,
_save_get(args, 'rank'),
_save_get(args, 'nic'),
_save_get(args, 'host'),
_save_get(args, 'prefix'),
args,
distributed_config)
pass
except Exception as e:
print(f"Failed with reason: {e}")
parser.print_help()
Expand Down
11 changes: 8 additions & 3 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def run(self):
self.running = True
event = multiprocessing.Event()
while self.running:
# Hack for running on Kubeflow
if not self.request_queue.empty():
self.logger.info("Got request, running synchronously")
request = self.request_queue.get()
self.logger.info(f"Got request, args: {request} running synchronously.")
self.result_queue.put(self.exec_round(*request))
event.wait(1)
self.logger.info(f"Exiting client {self.id}")

def stop_client(self):
"""
Expand Down Expand Up @@ -109,6 +111,7 @@ def train(self, num_epochs: int, round_id: int):

outputs = self.net(inputs)
loss = self.loss_function(outputs, labels)

running_loss += loss.detach().item()
loss.backward()
self.optimizer.step()
Expand All @@ -123,7 +126,9 @@ def train(self, num_epochs: int, round_id: int):
end_time = time.time()
duration = end_time - start_time
self.logger.info(f'{progress} Train duration is {duration} seconds')

# Clear gradients before we send.
self.optimizer.zero_grad(set_to_none=True)
gc.collect()
return final_running_loss, self.get_nn_parameters()

def set_tau_eff(self, total):
Expand Down Expand Up @@ -154,7 +159,7 @@ def test(self) -> Tuple[float, float, np.array]:
outputs = self.net(images)

_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
total += labels.size(0)
total += int(labels.size(0))
correct += (predicted == labels).sum().detach().item()

targets_.extend(labels.cpu().view_as(predicted).numpy())
Expand Down
19 changes: 10 additions & 9 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def _generate_experiment_path_name(task: ArrivalTask, u_id: Union[uuid.UUID, str
@rtype: str
"""
log_dir = config.execution_config.log_path
experiment_path = config.execution_config.experiment_prefix
experiment_name = f"{task.dataset}_{task.network}_{u_id}_{task.replication}"
full_path = f"{log_dir}/{experiment_name}"
full_path = f"{log_dir}/{experiment_path}/{experiment_name}"
return full_path


Expand Down Expand Up @@ -209,7 +210,7 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None:
self._v1.create_namespaced_config_map(self._config.cluster_config.namespace,
config_map)

def wait_for_jobs_to_complete(self):
def wait_for_jobs_to_complete(self, ret=False):
"""
Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running
an experiment. Thereby allowing for running multiple experiments on a single cluster, without letting
Expand All @@ -225,14 +226,15 @@ def wait_for_jobs_to_complete(self):
logging.debug(msg=f"Could not retrieve job_status for {task.id}")
job_status = None

if job_status and job_status in {'Completed', 'Failed'}:
if job_status and job_status in {'Completed', 'Failed', 'Succeeded'}:
logging.info(f"{task.id} was completed with status: {job_status}, moving to completed")
task_to_move.add(task)
else:
logging.info(f"Waiting for {task.id} to complete")

self.completed_tasks.update(task_to_move)
self.deployed_tasks.difference_update(task_to_move)
if ret:
return
time.sleep(self.SLEEP_TIME)


Expand Down Expand Up @@ -311,7 +313,7 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
if clear:
self._clear_jobs()
duration = self._config.get_duration()
while self._alive and (duration < 0 or not time.time() - start_time < duration):
while self._alive and ((duration < 0) or not time.time() - start_time < duration):
# 1. Check arrivals
# If new arrivals, store them in arrival PriorityQueue
while not self._arrival_generator.arrivals.empty():
Expand All @@ -338,8 +340,7 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
if not self._config.cluster_config.orchestrator.parallel_execution:
self.wait_for_jobs_to_complete()
self._logger.debug("Still alive...")
time.sleep(self.SLEEP_TIME)
self.wait_for_jobs_to_complete()
self.wait_for_jobs_to_complete(ret=True)
self.wait_for_jobs_to_complete(ret=False)
self.stop()
logging.info('Experiment completed.')
28 changes: 20 additions & 8 deletions fltk/core/federator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import copy
import gc

import numpy as np
import sklearn
import time
Expand Down Expand Up @@ -210,7 +212,7 @@ def set_tau_eff(self):
# responses.append((client, _remote_method_async(Client.set_tau_eff, client.ref, total)))
# torch.futures.wait_all([x[1] for x in responses])

def test(self, net) -> Tuple[float, float, np.array]:
def test(self, net) -> Tuple[float, float, np.array, float]:
"""
Function to test the learned global model by the Federator. This does not take the client distributions in
account but is centralized.
Expand All @@ -233,7 +235,7 @@ def test(self, net) -> Tuple[float, float, np.array]:
outputs = net(images)

_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
total += labels.size(0)
total += int(labels.size(0))
correct += (predicted == labels).sum().item()

targets_.extend(labels.cpu().view_as(predicted).numpy())
Expand All @@ -247,7 +249,7 @@ def test(self, net) -> Tuple[float, float, np.array]:
end_time = time.time()
duration = end_time - start_time
self.logger.info(f'Test duration is {duration} seconds')
return accuracy, loss, confusion_mat
return accuracy, loss, confusion_mat, duration

def exec_round(self, com_round_id: int):
"""
Expand Down Expand Up @@ -300,20 +302,30 @@ def all_futures_done(futures: List[torch.Future]) -> bool: # pylint: disable=no
time.sleep(0.1)
# self.logger.info('')
# self.logger.info(f'Waiting for other clients')

send_receive_duration = time.time() - start_time
self.logger.info('Continue with rest [1]')
time.sleep(3)

updated_model = self.aggregation_method(client_weights, client_sizes)
self.logger.info(f"Aggregrating: {len(client_weights)} updates, using {self.config.aggregation}")
updated_model = self.aggregation_method(client_weights, client_sizes)
self.logger.info(f"Updating global model.")

self.update_nn_parameters(updated_model)
del client_weights
gc.collect()
self.logger.info(f"Testing global model.")

test_accuracy, test_loss, conf_mat = self.test(self.net)
test_accuracy, test_loss, conf_mat, test_duration = self.test(self.net)
self.logger.info(f'[Round {com_round_id:>3}] Federator has a accuracy of {test_accuracy} and loss={test_loss}')

end_time = time.time()
duration = end_time - start_time
record = FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy,
record = FederatorRecord(num_selected_clients=len(selected_clients),
round_id=com_round_id,
round_duration=duration,
test_duration=test_duration,
send_receive_duration=send_receive_duration,
test_loss=test_loss,
test_accuracy=test_accuracy,
confusion_matrix=conf_mat)
self.exp_data.append(record)
self.logger.info(f'[Round {com_round_id:>3}] Round duration is {duration} seconds')
6 changes: 4 additions & 2 deletions fltk/datasets/federated/cifar100.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def init_train_dataset(self):
transforms.ToTensor(),
normalize
])
self.train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True,
transform=transform)
self.train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(),
train=True,
download=True,
transform=transform)
self.train_sampler = get_sampler(self.train_dataset, self.args)
self.train_loader = DataLoader(self.train_dataset, batch_size=self.args.batch_size, sampler=self.train_sampler)

Expand Down
2 changes: 1 addition & 1 deletion fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC
msg = f'Starting with host={host} and port={os.environ["MASTER_PORT"]} and interface={nic}'
logging.log(logging.INFO, msg)
options = rpc.TensorPipeRpcBackendOptions(
num_worker_threads=16,
num_worker_threads=16 if rank == 0 else 4,
rpc_timeout=0, # infinite timeout
init_method='env://',
_transports=["uv"] # Use LibUV backend for async/IO interaction
Expand Down
2 changes: 1 addition & 1 deletion fltk/nets/cifar_100_resnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, block: Type[torch.nn.Module] = BasicBlock, num_block=None, nu
torch.nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
torch.nn.BatchNorm2d(64),
torch.nn.ReLU(inplace=True))
# we use a different inputsize than the original paper
# we use a different input size than the original paper
# so conv2_x's stride is 1
self.conv2_x = self._make_layer(block, 64, num_block[0], 1)
self.conv3_x = self._make_layer(block, 128, num_block[1], 2)
Expand Down
1 change: 1 addition & 0 deletions fltk/samplers/uniform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ class UniformSampler(DistributedSamplerWrapper):
def __init__(self, dataset, num_replicas=None, rank=None, seed=0):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, seed=seed)
indices = list(range(len(self.dataset)))

self.indices = indices[self.rank:self.total_size:self.n_clients]
4 changes: 3 additions & 1 deletion fltk/util/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class DataRecord:
class FederatorRecord(DataRecord):
num_selected_clients: int
round_id: int
round_duration: int
round_duration: float
test_duration: float
send_receive_duration: float
test_loss: float
test_accuracy: float
# Accuracy per class?
Expand Down
13 changes: 3 additions & 10 deletions requirements-cpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ numpy==1.22.3
oauthlib==3.2.0
packaging==21.0
parameterized==0.8.1
Pillow==8.3.2
Pint==0.17
pluggy==1.0.0
prettyprinter==0.18.0
Expand Down Expand Up @@ -65,16 +64,10 @@ tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.0
threadpoolctl==2.1.0
tomli==2.0.1
-f https://download.pytorch.org/whl/torch_stable.html
torch==1.9.0+cpu; sys_platform != "darwin"
--extra-index-url https://download.pytorch.org/whl/cpu
torch==1.9.0; sys_platform == "darwin"
torchmetrics==0.5.0
torchsummary==1.5.1
-f https://download.pytorch.org/whl/torch_stable.html
torchvision==0.10.0+cpu; sys_platform != "darwin"
--extra-index-url https://download.pytorch.org/whl/cpu
torch==1.9.0; sys_platform == "darwin"
torch==1.12.1
torchvision==0.13.1
torchaudio==0.12.1
tqdm==4.49.0
typing-inspect==0.7.1
typing_extensions==4.1.1
Expand Down

0 comments on commit 824fae1

Please sign in to comment.