diff --git a/charts/fltk-values.yaml b/charts/fltk-values.yaml index a6247db0..2dead84f 100644 --- a/charts/fltk-values.yaml +++ b/charts/fltk-values.yaml @@ -1,4 +1,4 @@ fltk: - worldsize: 10 + worldsize: 50 config: cloud_experiment.yaml port: 5000 \ No newline at end of file diff --git a/charts/worker/templates/client-slow.yaml b/charts/worker/templates/client-slow.yaml index aa293355..a18772f9 100644 --- a/charts/worker/templates/client-slow.yaml +++ b/charts/worker/templates/client-slow.yaml @@ -43,8 +43,11 @@ spec: cpu: {{ $workercpu }} # 1 GiB ? memory: {{ $workermemory }} + requests: + memory: 1500000000 restartPolicy: Never -status: {} +status: + qosClass: Guaranteed # Helm requires seperation. --- {{- end }} diff --git a/charts/worker/values.yaml b/charts/worker/values.yaml index 5576c851..f46835fc 100644 --- a/charts/worker/values.yaml +++ b/charts/worker/values.yaml @@ -1,3 +1,3 @@ worker: - cpu: 750m - memory: 1300000000 + cpu: 500m + memory: 1500000000 diff --git a/configs/cloud_experiment.yaml b/configs/cloud_experiment.yaml index 985aa75b..566ab044 100644 --- a/configs/cloud_experiment.yaml +++ b/configs/cloud_experiment.yaml @@ -13,7 +13,7 @@ cuda: false experiment_prefix: 'experiment_single_machine' output_location: 'output' tensor_board_active: true -clients_per_round: 10 +clients_per_round: 50 system: federator: # Use the SERVICE provided by the fl-server to connect @@ -21,11 +21,11 @@ system: # Default NIC is eth0 nic: 'eth0' clients: - amount: 10 + amount: 50 poison: seed: 420 ratio: 0.2 attack: type: "flip" config: - - 6: 4 + - 5: 3 diff --git a/fltk/client.py b/fltk/client.py index 437a8581..61ed60f7 100644 --- a/fltk/client.py +++ b/fltk/client.py @@ -89,17 +89,6 @@ def reset_model(self): @return: None @rtype: None """ - # Load the default model - # Delete the network to prevent out of memory exceptions being thrown - try: - del self.net - - # Delete dataloader to prevent out of memory exceptions being thrown - del self.dataset - except Exception as e: - print(f"something went wrong: {e}") - # Load network - self.set_net(self.load_default_model()) # Set loss function for gradient calculation self.loss_function = self.args.get_loss_function()() # Create optimizer (default is SGD): TODO: Move to AdamW? @@ -112,6 +101,7 @@ def reset_model(self): self.args.get_min_lr()) # Reset the epoch counter self.epoch_counter = 0 + self.finished_init = True def ping(self): """ @@ -156,7 +146,6 @@ def init_dataloader(self, pill: PoisonPill = None): # self.dataset = self.args.DistDatasets[self.args.dataset_name](self.args) - self.finished_init = True self.finished_init = True print("Done with init") logging.info('Done with init') @@ -320,6 +309,7 @@ def test(self): def run_epochs(self, num_epoch, pill: PoisonPill = None): """ """ + self.finished_init = False start_time_train = datetime.datetime.now() self.dataset.get_train_sampler().set_epoch_size(num_epoch) loss, weights = self.train(self.epoch_counter, pill) diff --git a/fltk/federator.py b/fltk/federator.py index 28cdeac2..1ddea866 100644 --- a/fltk/federator.py +++ b/fltk/federator.py @@ -112,9 +112,13 @@ def create_clients(self, client_id_triple): self.client_data[id] = [] def update_clients(self, ratio): + # Prevent abrupt ending of the client + self.tb_writer.close() self.tb_writer = SummaryWriter(f'{self.tb_path_base}/{self.config.experiment_prefix}_federator') for client in self.clients: + # Create new writer and close old writer writer = SummaryWriter(f'{self.tb_path_base}/{self.config.experiment_prefix}_client_{client.name}_{ratio}') + client.tb_writer.close() client.tb_writer = writer self.client_data[client.name] = [] @@ -179,7 +183,7 @@ def clients_ready(self): time.sleep(2) logging.info('All clients are ready') - def remote_run_epoch(self, epochs, cur_model: torch.nn.Module, ratio = None): + def remote_run_epoch(self, epochs, cur_model: torch.nn.Module, ratio = None, store_grad=False): responses = [] client_weights = [] selected_clients = self.select_clients(self.config.clients_per_round) @@ -194,13 +198,23 @@ def remote_run_epoch(self, epochs, cur_model: torch.nn.Module, ratio = None): pill = self.attack.get_poison_pill() responses.append((client, _remote_method_async(Client.run_epochs, client.ref, num_epoch=epochs, pill=pill))) self.epoch_counter += epochs - flat_current = flatten_params(cur_model.state_dict()) - for res in responses: - epoch_data, weights = res[1].wait() - # get flatten + accuracy, loss, class_precision, class_recall = self.test_data.test() + # self.tb_writer.add_scalar('training loss', loss, self.epoch_counter * self.test_data.get_client_datasize()) # does not seem to work :( ) + self.tb_writer.add_scalar('accuracy', accuracy, self.epoch_counter * self.test_data.get_client_datasize()) + self.tb_writer.add_scalar('accuracy per epoch', accuracy, self.epoch_counter) + flat_current = None - self.store_gradient(flatten_params(weights) - flat_current, epoch_data.client_id, self.epoch_counter, ratio) + # Test the model before waiting for the model. + self.test_model() + + if store_grad: + flat_current = flatten_params(cur_model.state_dict()) + for res in responses: + epoch_data, weights = res[1].wait() + if store_grad: + # get flatten + self.store_gradient(flatten_params(weights) - flat_current, epoch_data.client_id, self.epoch_counter, ratio) self.client_data[epoch_data.client_id].append(epoch_data) logging.info(f'{res[0]} had a loss of {epoch_data.loss}') logging.info(f'{res[0]} had a epoch data of {epoch_data}') @@ -227,20 +241,8 @@ def remote_run_epoch(self, epochs, cur_model: torch.nn.Module, ratio = None): # test global model logging.info("Testing on global test set") self.test_data.update_nn_parameters(updated_model) - accuracy, loss, class_precision, class_recall = self.test_data.test() - # self.tb_writer.add_scalar('training loss', loss, self.epoch_counter * self.test_data.get_client_datasize()) # does not seem to work :( ) - self.tb_writer.add_scalar('accuracy', accuracy, self.epoch_counter * self.test_data.get_client_datasize()) - self.tb_writer.add_scalar('accuracy per epoch', accuracy, self.epoch_counter) - - responses = [] - for client in self.clients: - responses.append( - (client, _remote_method_async(Client.update_nn_parameters, client.ref, new_params=updated_model))) - - for res in responses: - res[1].wait() - logging.info('Weights are updated') - return self.test_data.net + self.distribute_new_model(updated_model) + return updated_model def update_client_data_sizes(self): responses = [] @@ -275,7 +277,7 @@ def save_epoch_data(self, ratio = None): def ensure_path_exists(self, path): Path(path).mkdir(parents=True, exist_ok=True) - def run(self, ratios = [0.1, 0.2, 0.3] ): + def run(self, ratios = [0.06, 0.12, 0.18] ): """ Main loop of the Federator :return: @@ -286,9 +288,11 @@ def run(self, ratios = [0.1, 0.2, 0.3] ): poison_pill = None save_path = self.config for rat in ratios: - # Get model to calculate gradient updates + # Get model to calculate gradient updates, default is shared between all. model = initialize_default_model(self.config, self.config.get_net()) - + # Re-use the functionality to update + self.distribute_new_model(model.state_dict()) + # Update the clients to point to the newer version. self.update_clients(rat) if self.attack: self.poisoned_workers: List[ClientRef] = self.attack.select_poisoned_workers(self.clients, rat) @@ -296,7 +300,6 @@ def run(self, ratios = [0.1, 0.2, 0.3] ): with open(f"{self.tb_path_base}/config_{rat}_poisoned.txt", 'w') as f: f.writelines(list(map(lambda worker: worker.name, self.poisoned_workers))) poison_pill = self.attack.get_poison_pill() - self.client_reset_model() self.client_load_data(poison_pill) self.ping_all() self.clients_ready() @@ -316,6 +319,11 @@ def run(self, ratios = [0.1, 0.2, 0.3] ): logging.info(f'Saving data') self.save_epoch_data(rat) + # Perform last test on the current model. + self.test_model() + # Reset the model to continue with the next round + self.client_reset_model() + logging.info(f'Federator is stopping') def store_gradient(self, gradient, client_id, epoch, ratio): @@ -337,3 +345,30 @@ def store_gradient(self, gradient, client_id, epoch, ratio): pathlib.Path(directory).mkdir(parents=True, exist_ok=True) # Save using pytorch. torch.save(gradient, f"{directory}/gradient.pt") + + def distribute_new_model(self, updated_model): + """ + Function to update the model on the clients + @return: + @rtype: + """ + responses = [] + for client in self.clients: + responses.append( + (client, _remote_method_async(Client.update_nn_parameters, client.ref, new_params=updated_model))) + + for res in responses: + res[1].wait() + logging.info('Weights are updated') + + def test_model(self): + """ + Function to test the model on the test dataset. + @return: + @rtype: + """ + # Test interleaved to speed up execution, i.e. don't keep the clients waiting. + accuracy, loss, class_precision, class_recall = self.test_data.test() + # self.tb_writer.add_scalar('training loss', loss, self.epoch_counter * self.test_data.get_client_datasize()) # does not seem to work :( ) + self.tb_writer.add_scalar('accuracy', accuracy, self.epoch_counter * self.test_data.get_client_datasize()) + self.tb_writer.add_scalar('accuracy per epoch', accuracy, self.epoch_counter)