diff --git a/fltk/__init__.py b/fltk/__init__.py index d1eb1a0f..92989687 100644 --- a/fltk/__init__.py +++ b/fltk/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.3.2' \ No newline at end of file +__version__ = '0.4.0' \ No newline at end of file diff --git a/fltk/client.py b/fltk/client.py deleted file mode 100644 index c4cf6ccd..00000000 --- a/fltk/client.py +++ /dev/null @@ -1,798 +0,0 @@ -import copy -import datetime -import os -import random -import time -from dataclasses import dataclass -from typing import List - -import torch -from torch.distributed import rpc -import logging -import numpy as np -from sklearn.metrics import confusion_matrix -from sklearn.metrics import classification_report -from torch.distributed.rpc import RRef - -from fltk.schedulers import MinCapableStepLR -from fltk.strategy.aggregation import FedAvg -from fltk.strategy.offloading import OffloadingStrategy, parse_strategy -from fltk.util.arguments import Arguments -from fltk.util.fed_avg import average_nn_parameters -from fltk.util.log import FLLogger - -import yaml - -from fltk.util.profiler import Profiler -from fltk.util.profilerV2 import Profiler as P2 -from fltk.util.profilerV3 import Profiler as P3 -from fltk.util.results import EpochData -from fltk.util.timer import elapsed_timer - -logging.basicConfig( - level=logging.DEBUG, - - format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', -) - -global_dict = {} -global_model_weights = {} -global_model_data_size = 0 -global_sender_id = "" -global_offload_received = False -global_local_updates_left = 0 - - -def _call_method(method, rref, *args, **kwargs): - """helper for _remote_method()""" - return method(rref.local_value(), *args, **kwargs) - -def _remote_method(method, rref, *args, **kwargs): - """ - executes method(*args, **kwargs) on the from the machine that owns rref - - very similar to rref.remote().method(*args, **kwargs), but method() doesn't have to be in the remote scope - """ - args = [method, rref] + list(args) - return rpc.rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs) - - -def _remote_method_async(method, rref, *args, **kwargs) -> torch.Future: - args = [method, rref] + list(args) - return rpc.rpc_async(rref.owner(), _call_method, args=args, kwargs=kwargs) - - -def _remote_method_async_by_info(method, worker_info, *args, **kwargs): - args = [method, worker_info] + list(args) - return rpc.rpc_async(worker_info, _call_method, args=args, kwargs=kwargs) - -class Client: - counter = 0 - finished_init = False - dataset = None - epoch_results: List[EpochData] = [] - epoch_counter = 0 - server_ref = None - offloaded_net = None - - # Model offloading - received_offload_model = False - offloaded_model_weights = None - call_to_offload = False - client_to_offload_to : str = None - offloaded_model_ready = False - - strategy = OffloadingStrategy.VANILLA - - deadline_enabled = False - swyh_enabled = False - freeze_layers_enabled = False - offload_enabled = False - dyn_terminate = False - dyn_terminate_swyh = False - - terminate_training = False - offload_release = False - - def __init__(self, id, log_rref, rank, world_size, config = None): - # logging.info(f'Welcome to client {id}') - self.id = id - global_dict['id'] = id - global global_model_weights, global_offload_received, global_model_data_size, global_local_updates_left - global_model_weights = None - global_offload_received = False - global_local_updates_left = 0 - global_model_data_size = 0 - self.log_rref = log_rref - self.rank = rank - self.world_size = world_size - # self.args = Arguments(logging) - self.args = config - self.args.init_logger(logging) - self.device = self.init_device() - self.set_net(self.load_default_model()) - self.loss_function = self.args.get_loss_function()() - self.optimizer = self.args.get_optimizer()(self.net.parameters(), - **self.args.optimizer_args) - self.scheduler = MinCapableStepLR(self.args.get_logger(), self.optimizer, - self.args.get_scheduler_step_size(), - self.args.get_scheduler_gamma(), - self.args.get_min_lr()) - self.strategy = OffloadingStrategy.Parse(config.offload_strategy) - self.configure_strategy(self.strategy) - - def load_offloaded_model(self): - self.offloaded_net = self.load_default_model() - self.offloaded_net.to(self.device) - logging.info('Offloaded network loaded') - - def copy_offloaded_model_weights(self): - self.update_nn_parameters(global_model_weights, True) - logging.info('Parameters of offloaded model updated') - self.offloaded_model_ready = True - - def configure_strategy(self, strategy : OffloadingStrategy): - deadline_enabled, swyh_enabled, freeze_layers_enabled, offload_enabled, dyn_terminate, dyn_terminate_swyh = parse_strategy(strategy) - self.deadline_enabled = deadline_enabled - self.swyh_enabled = swyh_enabled - self.freeze_layers_enabled = freeze_layers_enabled - self.offload_enabled = offload_enabled - self.dyn_terminate = dyn_terminate - self.dyn_terminate_swyh = dyn_terminate_swyh - logging.info(f'Offloading strategy={strategy}') - logging.info(f'Offload strategy params: deadline={self.deadline_enabled}, ' - f'swyh={self.swyh_enabled}, freeze={self.freeze_layers_enabled}, ' - f'offload={self.offload_enabled}, dyn_terminate={self.dyn_terminate}, ' - f'dyn_terminate_swyh={self.dyn_terminate_swyh}') - - def set_tau_eff(self, total): - client_weight = self.get_client_datasize() / total - n = self.get_client_datasize() - E = self.args.epochs_per_round - B = 16 # nicely hardcoded :) - tau_eff = int(E * n / B) * client_weight - if hasattr(self.optimizer, 'set_tau_eff'): - self.optimizer.set_tau_eff(tau_eff) - - def init_device(self): - if self.args.cuda and torch.cuda.is_available(): - return torch.device("cuda:0") - else: - return torch.device("cpu") - - def send_reference(self, server_ref): - self.local_log(f'Got worker_info from server {server_ref}') - self.server_ref = server_ref - - def terminate_training_endpoint(self): - logging.info('I got a call for training termination!') - self.terminate_training = True - - @staticmethod - def static_ping(): - print(f'Got static ping with global_dict={global_dict}') - - def ping(self): - self.local_log(f'Pong!') - self.local_log(f'Pong2! {self.id}') - return 'pong' - - - def rpc_test(self): - sleep_time = random.randint(1, 5) - time.sleep(sleep_time) - self.local_log(f'sleep for {sleep_time} seconds') - self.counter += 1 - log_line = f'Number of times called: {self.counter}' - self.local_log(log_line) - self.remote_log(log_line) - - def remote_log(self, message): - _remote_method_async(FLLogger.log, self.log_rref, self.id, message, time.time()) - - def local_log(self, message): - logging.info(f'[{self.id}: {time.time()}]: {message}') - - def set_configuration(self, config: str): - yaml_config = yaml.safe_load(config) - - def init(self): - pass - - def init_dataloader(self, ): - self.args.distributed = True - self.args.rank = self.rank - self.args.world_size = self.world_size - # self.dataset = DistCIFAR10Dataset(self.args) - self.dataset = self.args.DistDatasets[self.args.dataset_name](self.args) - self.finished_init = True - logging.info('Done with init') - - def is_ready(self): - logging.info("Client is ready") - return self.finished_init, RRef(self) - - def set_net(self, net): - self.net = net - self.net.to(self.device) - - def load_model_from_file(self, model_file_path): - model_class = self.args.get_net() - default_model_path = os.path.join(self.args.get_default_model_folder_path(), model_class.__name__ + ".model") - return self.load_model_from_file(default_model_path) - - def get_nn_parameters(self): - """ - Return the NN's parameters. - """ - return self.net.state_dict() - - def load_default_model(self): - """ - Load a model from default model file. - - This is used to ensure consistent default model behavior. - """ - model_class = self.args.get_net() - default_model_path = os.path.join(self.args.get_default_model_folder_path(), model_class.__name__ + ".model") - - return self.load_model_from_file(default_model_path) - - def load_model_from_file(self, model_file_path): - """ - Load a model from a file. - - :param model_file_path: string - """ - model_class = self.args.get_net() - model = model_class() - - if os.path.exists(model_file_path): - try: - model.load_state_dict(torch.load(model_file_path)) - except: - self.args.get_logger().warning("Couldn't load model. Attempting to map CUDA tensors to CPU to solve error.") - - model.load_state_dict(torch.load(model_file_path, map_location=torch.device('cpu'))) - else: - self.args.get_logger().warning("Could not find model: {}".format(model_file_path)) - - return model - - def get_client_index(self): - """ - Returns the client index. - """ - return self.client_idx - - def update_nn_parameters(self, new_params, is_offloaded_model = False): - """ - Update the NN's parameters. - - :param new_params: New weights for the neural network - :type new_params: dict - """ - if is_offloaded_model: - self.offloaded_net.load_state_dict(copy.deepcopy(new_params), strict=True) - else: - self.net.load_state_dict(copy.deepcopy(new_params), strict=True) - if self.log_rref: - self.remote_log(f'Weights of the model are updated') - - - def report_performance_async(self, performance_data): - self.local_log('Reporting performance') - from fltk.federator import Federator - return _remote_method_async(Federator.perf_metric_endpoint, self.server_ref, self.id, performance_data) - - def report_performance_estimate(self, performance_data): - self.local_log('Reporting performance estimate') - from fltk.federator import Federator - return _remote_method_async(Federator.perf_est_endpoint, self.server_ref, self.id, performance_data) - - @staticmethod - def offload_receive_endpoint(model_weights, num_train_samples, sender_id, local_updates_left): - print(f'Got the offload_receive_endpoint endpoint') - global global_model_weights, global_offload_received, global_model_data_size, global_sender_id, global_local_updates_left - global_model_weights = copy.deepcopy(model_weights.copy()) - global_model_data_size = num_train_samples - global_sender_id = sender_id - global_local_updates_left = local_updates_left - global_offload_received = True - - @staticmethod - def offload_receive_endpoint_2(string): - print(f'Got the offload_receive_endpoint endpoint') - print(f'Got the offload_receive_endpoint endpoint with arg={string}') - # global global_model_weights, global_offload_received - # global_model_weights = model_weights.copy(deep=True) - # global_offload_received = True - - - def call_to_offload_endpoint(self, client_to_offload: RRef, soft_deadline): - self.local_log(f'Got the call to offload endpoint to {client_to_offload}') - self.client_to_offload_to = client_to_offload - self.call_to_offload = True - - def release_from_offloading_endpoint(self): - logging.info('Got a release signal') - self.offload_release = True - - def freeze_layers2(self, until, net): - - def get_children(model: torch.nn.Module): - children = list(model.children()) - flatt_children = [] - if children == []: - return model - else: - for child in children: - try: - flatt_children.extend(get_children(child)) - except TypeError: - flatt_children.append(get_children(child)) - return flatt_children - - for idx, layer in enumerate(get_children(net)): - if idx < until: - print(f'[{idx}] Freezing layer: {layer}') - for param in layer.parameters(): - param.requires_grad = False - def freeze_layers(self, until): - ct = 0 - for child in self.net.children(): - ct += 1 - if ct < until: - for param in child.parameters(): - param.requires_grad = False - - - def unfreeze_layers(self): - for param in self.net.parameters(): - param.requires_grad = True - - def train(self, epoch, deadline: int = None, warmup=False, use_offloaded_model=False): - """ - - Different modes: - 1. Vanilla - 2. Deadline - 3. SWYH - 4. Just Freeze - 5. Model Offload - - - :: Vanilla - Disable deadline - Disable swyh - Disable offload - - :: Deadline - We need to keep track of the incoming deadline - We don't need to send data before the deadline - - :param epoch: Current epoch # - :type epoch: int - """ - - perf_data = { - 'total_duration': 0, - 'p_v2_data': None, - 'p_v1_data': None, - 'n_batches': 0 - } - - start_time = time.time() - - if use_offloaded_model: - for param in self.offloaded_net.parameters(): - param.requires_grad = True - deadline_threshold = self.args.deadline_threshold - train_stop_time = None - if self.deadline_enabled and deadline is not None: - train_stop_time = start_time + deadline - deadline_threshold - - # Ignore profiler for now - # p = Profiler() - # p.attach(self.net) - - # self.net.train() - global global_model_weights, global_offload_received - # deadline_time = None - # # save model - # if self.args.should_save_model(epoch): - # self.save_model(epoch, self.args.get_epoch_save_start_suffix()) - - running_loss = 0.0 - final_running_loss = 0.0 - if self.args.distributed: - self.dataset.train_sampler.set_epoch(epoch) - number_of_training_samples = len(self.dataset.get_train_loader()) - self.args.get_logger().info(f'{self.id}: Number of training samples: {number_of_training_samples}') - # self.args.get_logger().info(f'{self.id}: Number of training samples: {len(self.dataset.get_train_loader())}') - # Ignore profiler for now - # performance_metric_interval = 20 - # perf_resp = None - - # Profiling parameters - profiling_size = self.args.profiling_size - if profiling_size == -1: - profiling_size = number_of_training_samples - profiling_data = np.zeros(profiling_size) - profiling_forwards_data = np.zeros(profiling_size) - profiling_backwards_data = np.zeros(profiling_size) - pre_train_loop_data = np.zeros(profiling_size) - post_train_loop_data = np.zeros(profiling_size) - active_profiling = True - split_point = self.args.nets_split_point[self.args.net_name] - p = P2(profiling_size, split_point - 1) - p3 = P3(profiling_size, split_point - 1) - - profiler_active = False - # Freezing effect experiment - if self.rank in self.args.freeze_clients: - logging.info('I need to freeze!') - split_point = self.args.nets_split_point[self.args.net_name] - self.freeze_layers2(split_point, self.net) - else: - if use_offloaded_model: - p.attach(self.offloaded_net) - p3.attach(self.offloaded_net) - else: - p.attach(self.net) - p3.attach(self.net) - profiler_active = True - - control_start_time = time.time() - training_process = 0 - - def calc_optimal_offloading_point(profiler_data, time_till_deadline, iterations_left): - logging.info(f'Calc optimal point: profiler_data={profiler_data}, time_till_deadline={time_till_deadline}, iterations_left={iterations_left}') - ff, cf, cb, fb = profiler_data - full_network = ff + cf + cb + fb - frozen_network = ff + cf + cb - split_point = 0 - for z in range(iterations_left, -1, -1): - x = z - y = iterations_left - x - # print(z) - new_est_split = (x * full_network) + (y * frozen_network) - split_point = x - if new_est_split < time_till_deadline: - break - logging.info(f'The offloading point is a iteration: {split_point}') - logging.info(f'Estimated default runtime={full_network* iterations_left}') - logging.info(f'new_est_split={new_est_split}, deadline={deadline}') - - start_loop_time = time.time() - for i, (inputs, labels) in enumerate(self.dataset.get_train_loader(), 0): - loop_pre_train_start = time.time() - start_train_time = time.time() - - if self.dyn_terminate_swyh or self.dyn_terminate: - if self.terminate_training: - logging.info('Got a call to terminate training') - break - - if use_offloaded_model and i > global_local_updates_left: - logging.info(f'Stoppinng training of offloaded model; no local updates left; Was {global_local_updates_left}') - break - if self.offload_enabled and not warmup: - # Check if there is a call to offload - if self.call_to_offload: - self.args.get_logger().info('Got call to offload model') - model_weights = self.get_nn_parameters() - local_updates_left = number_of_training_samples - i - ret = rpc.rpc_async(self.client_to_offload_to, Client.offload_receive_endpoint, args=([model_weights, i, self.id, local_updates_left])) - print(f'Result of rref: {ret}') - # - self.call_to_offload = False - self.client_to_offload_to = None - # This number only works for cifar10cnn - # @TODO: Make this dynamic for other networks - # self.freeze_layers(5) - split_point = self.args.nets_split_point[self.args.net_name] - self.freeze_layers2(split_point, self.net) - - # Check if there is a model to incorporate - # Disable for now to offloading testing - # if global_offload_received: - # self.args.get_logger().info('Merging offloaded model') - # self.args.get_logger().info('FedAvg locally with offloaded model') - # updated_weights = FedAvg({'own': self.get_nn_parameters(), 'remote': global_model_weights}, {'own': i, 'remote': global_model_data_size}) - # - # # updated_weights = average_nn_parameters([self.get_nn_parameters(), global_model_weights]) - # self.args.get_logger().info('Updating local weights due to offloading') - # self.update_nn_parameters(updated_weights) - # global_offload_received = False - # global_model_weights = None - - if self.swyh_enabled and not warmup: - # Deadline - if train_stop_time is not None: - if time.time() >= train_stop_time: - self.args.get_logger().info('Stopping training due to deadline time') - break - # else: - # self.args.get_logger().info(f'Time to deadline: {train_stop_time - time.time()}') - - - - - inputs, labels = inputs.to(self.device), labels.to(self.device) - training_process = i - - # zero the parameter gradients - self.optimizer.zero_grad() - loop_pre_train_end = time.time() - if profiler_active: - p.signal_forward_start() - p3.signal_forward_start() - outputs = None - if use_offloaded_model: - outputs = self.offloaded_net(inputs) - else: - outputs = self.net(inputs) - loss = self.loss_function(outputs, labels) - post_train_time = time.time() - if active_profiling: - profiling_forwards_data[i] = post_train_time - start_train_time - - # Ignore profiler for now - # p.signal_backward_start() - if profiler_active: - p.signal_backward_start() - p3.signal_forward_end() - p3.signal_backwards_start() - loss.backward() - self.optimizer.step() - if profiler_active: - p3.signal_backwards_end() - p.step() - p3.step() - loop_post_train_start = time.time() - # print statistics - running_loss += loss.item() - if i % self.args.get_log_interval() == 0: - self.args.get_logger().info('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / self.args.get_log_interval())) - final_running_loss = running_loss / self.args.get_log_interval() - running_loss = 0.0 - if active_profiling: - profiling_backwards_data[i] = time.time() - post_train_time - - # Ignore profiler for now - # p.set_warmup(True) - # if i % performance_metric_interval == 0: - # # perf_metrics = p.calc_metric(15) - # perf_metrics = p.export_data() - # self.args.get_logger().info(f'Number of events = {len(perf_metrics)}') - # perf_resp = self.report_performance_async(perf_metrics) - # p.reset() - if active_profiling: - # print(i) - end_train_time = time.time() - batch_duration = end_train_time - start_train_time - profiling_data[i] = batch_duration - if i == profiling_size-1: - profiler_active = False - active_profiling = False - p.remove_all_handles() - p3.remove_all_handles() - time_per_batch = profiling_data.mean() - logging.info(f'Average batch duration is {time_per_batch}') - profiler_data = p.aggregate_values() - p3_data = p3.aggregate_values() - logging.info(f'Profiler data: {profiler_data}') - logging.info(f'P3 Profiler data: {p3_data}') - calc_optimal_offloading_point(profiler_data, deadline, number_of_training_samples - i) - - # Estimated training time - est_total_time = number_of_training_samples * time_per_batch - logging.info(f'Estimated training time is {est_total_time}') - self.report_performance_estimate((time_per_batch, est_total_time, number_of_training_samples)) - - if self.freeze_layers_enabled and not warmup: - logging.info(f'Checking if need to freeze layers ? {est_total_time} > {deadline}') - if est_total_time > deadline: - logging.info('Will freeze layers to speed up computation') - # This number only works for cifar10cnn - # @TODO: Make this dynamic for other networks - # self.freeze_layers(5) - split_point = self.args.nets_split_point[self.args.net_name] - self.freeze_layers2(split_point, self.net) - # logging.info(f'Batch time is {batch_duration}') - - # Break away from loop for debug purposes - # if i > 5: - # break - loop_post_train_end = time.time() - if active_profiling: - pre_train_loop_data[i] = loop_pre_train_end - loop_pre_train_start - post_train_loop_data[i] = loop_post_train_end - loop_post_train_start - p.remove_all_handles() - p3.remove_all_handles() - control_end_time = time.time() - end_loop_time = time.time() - logging.info(f'Measure end time is {(control_end_time - control_start_time)}') - logging.info(f'Trained on {training_process} samples') - # logging.info(f'Profiler data: {p.get_values()}') - - perf_data['total_duration'] = control_end_time - control_start_time - perf_data['n_batches'] = len(self.dataset.get_train_loader()) - perf_data['p_v2_data'] = p.get_values() - perf_data['p_v3_data'] = p3.get_values() - perf_data['p_v1_data'] = profiling_data - perf_data['pre_train_loop_data'] = pre_train_loop_data - perf_data['post_train_loop_data'] = post_train_loop_data - perf_data['p_v1_pre_loop'] = start_loop_time - start_time - perf_data['p_v1_forwards'] = profiling_forwards_data - perf_data['p_v1_backwards'] = profiling_backwards_data - perf_data['loop_duration'] = end_loop_time - start_loop_time - if not warmup: - self.scheduler.step() - # logging.info(self.optimizer.param_groups) - scheduler_data = { - 'lr': self.scheduler.optimizer.param_groups[0]['lr'], - 'momentum': self.scheduler.optimizer.param_groups[0]['momentum'], - 'wd': self.scheduler.optimizer.param_groups[0]['weight_decay'], - } - - # Reset the layers - self.unfreeze_layers() - - # save model - if self.args.should_save_model(epoch): - self.save_model(epoch, self.args.get_epoch_save_end_suffix()) - perf_data['p_v1_post_loop'] = time.time() - control_end_time - return final_running_loss, self.get_nn_parameters(), training_process, scheduler_data, perf_data - - def test(self, use_offloaded_model = False): - if use_offloaded_model: - self.offloaded_net.eval() - else: - self.net.eval() - - correct = 0 - total = 0 - targets_ = [] - pred_ = [] - loss = 0.0 - with torch.no_grad(): - for (images, labels) in self.dataset.get_test_loader(): - images, labels = images.to(self.device), labels.to(self.device) - - if use_offloaded_model: - outputs = self.offloaded_net(images) - else: - outputs = self.net(images) - - _, predicted = torch.max(outputs.data, 1) - total += labels.size(0) - correct += (predicted == labels).sum().item() - - targets_.extend(labels.cpu().view_as(predicted).numpy()) - pred_.extend(predicted.cpu().numpy()) - - loss += self.loss_function(outputs, labels).item() - - accuracy = 100 * correct / total - confusion_mat = confusion_matrix(targets_, pred_) - accuracy_per_class = confusion_mat.diagonal() / confusion_mat.sum(1) - - class_precision = self.calculate_class_precision(confusion_mat) - class_recall = self.calculate_class_recall(confusion_mat) - if False: - self.args.get_logger().debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) - self.args.get_logger().debug('Test set: Loss: {}'.format(loss)) - self.args.get_logger().debug("Classification Report:\n" + classification_report(targets_, pred_)) - self.args.get_logger().debug("Confusion Matrix:\n" + str(confusion_mat)) - self.args.get_logger().debug("Class precision: {}".format(str(class_precision))) - self.args.get_logger().debug("Class recall: {}".format(str(class_recall))) - - return accuracy, loss, class_precision, class_recall, accuracy_per_class - - - def run_epochs(self, num_epoch, deadline: int = None, warmup=False): - """ - Timing data to measure: - Total execution tim: - """ - self.terminate_training = False - start = time.time() - - start_time_train = datetime.datetime.now() - self.call_to_offload = False - self.dataset.get_train_sampler().set_epoch_size(num_epoch) - # Train locally - loss, weights, training_process, scheduler_data, perf_data = self.train(self.epoch_counter, deadline, warmup) - if self.terminate_training and self.dyn_terminate: - logging.info('Not testing data due to termination call') - self.terminate_training = False - return {'own': []} - elif self.terminate_training and self.dyn_terminate_swyh: - self.terminate_training = False - logging.info('Sending back weights due to terminate with swyh') - if not warmup: - self.epoch_counter += num_epoch - elapsed_time_train = datetime.datetime.now() - start_time_train - train_time_ms = int(elapsed_time_train.total_seconds()*1000) - post_training_time = time.time() - - start_time_test = datetime.datetime.now() - accuracy, test_loss, class_precision, class_recall, _accuracy_per_class = self.test() - elapsed_time_test = datetime.datetime.now() - start_time_test - test_time_ms = int(elapsed_time_test.total_seconds()*1000) - post_test_time = time.time() - - # Timing data that needs to be send back - duration_train = post_training_time - start - duration_test = post_test_time - post_training_time - logging.info( - f'Time for training={duration_train}, time for testing={duration_test}, total time={duration_train + duration_test}') - data = EpochData(self.epoch_counter, num_epoch, train_time_ms, test_time_ms, loss, accuracy, test_loss, - class_precision, class_recall, training_process, self.id, client_wall_time=time.time()) - self.epoch_results.append(data) - if hasattr(self.optimizer, 'pre_communicate'): # aka fednova or fedprox - self.optimizer.pre_communicate() - for k, v in weights.items(): - weights[k] = v.cpu() - response_obj = {'own': [data, weights, scheduler_data, perf_data]} - - global global_offload_received - if self.offload_enabled: - logging.info('Waiting to receive offload or being released') - while not (global_offload_received or self.offload_release): - time.sleep(0.1) - logging.info(f'Continuing after global_offload_received={global_offload_received} and offload_release={self.offload_release}') - if self.offload_enabled and global_offload_received: - self.configure_strategy(OffloadingStrategy.SWYH) - # self.configure_strategy(OffloadingStrategy.VANILLA) - logging.info('Processing offloaded model') - self.load_offloaded_model() - self.copy_offloaded_model_weights() - elapsed_time = time.time() - start - deadline -= elapsed_time - loss_offload, weights_offload, training_process_offload, scheduler_data_offload, perf_data_offload = self.train(self.epoch_counter, deadline, warmup, use_offloaded_model=True) - accuracy, test_loss, class_precision, class_recall, _accuracy_per_class = self.test(use_offloaded_model=True) - global global_sender_id - data_offload = EpochData(self.epoch_counter, num_epoch, train_time_ms, test_time_ms, loss_offload, accuracy, test_loss, - class_precision, class_recall, training_process, f'{global_sender_id}-offload', client_wall_time=time.time()) - - # Copy GPU tensors to CPU - for k, v in weights_offload.items(): - weights_offload[k] = v.cpu() - response_obj['offload'] = [ data_offload, weights_offload, scheduler_data_offload, perf_data_offload, global_sender_id] - self.configure_strategy(OffloadingStrategy.MODEL_OFFLOAD) - else: - logging.info(f'Not doing offloading due to offload_enabled={self.offload_enabled} and global_offload_received={global_offload_received}') - return response_obj - - def save_model(self, epoch, suffix): - """ - Saves the model if necessary. - """ - self.args.get_logger().debug("Saving model to flat file storage. Save #{}", epoch) - - if not os.path.exists(self.args.get_save_model_folder_path()): - os.mkdir(self.args.get_save_model_folder_path()) - - full_save_path = os.path.join(self.args.get_save_model_folder_path(), "model_" + str(self.client_idx) + "_" + str(epoch) + "_" + suffix + ".model") - torch.save(self.get_nn_parameters(), full_save_path) - - def calculate_class_precision(self, confusion_mat): - """ - Calculates the precision for each class from a confusion matrix. - """ - return np.diagonal(confusion_mat) / np.sum(confusion_mat, axis=0) - - def calculate_class_recall(self, confusion_mat): - """ - Calculates the recall for each class from a confusion matrix. - """ - return np.diagonal(confusion_mat) / np.sum(confusion_mat, axis=1) - - def get_client_datasize(self): - return len(self.dataset.get_train_sampler()) - - def __del__(self): - print(f'Client {self.id} is stopping') diff --git a/fltk/federator.py b/fltk/federator.py deleted file mode 100644 index 5acf545d..00000000 --- a/fltk/federator.py +++ /dev/null @@ -1,852 +0,0 @@ -import datetime -import time -from dataclasses import dataclass -from typing import List - -import pandas as pd -import torch -from dataclass_csv import DataclassWriter -from torch.distributed import rpc -from torch.distributed.rpc import RRef, get_worker_info -from torch.utils.data._utils.worker import WorkerInfo - -from fltk.client import Client -from fltk.datasets.data_distribution import distribute_batches_equally -from fltk.strategy.aggregation import FedAvg -from fltk.strategy.client_selection import random_selection, tifl_update_probs, tifl_select_tier_and_decrement -from fltk.strategy.offloading import OffloadingStrategy, parse_strategy -from fltk.util.arguments import Arguments -from fltk.util.base_config import BareConfig -from fltk.util.data_loader_utils import load_train_data_loader, load_test_data_loader, \ - generate_data_loaders_from_distributed_dataset -from fltk.util.fed_avg import average_nn_parameters -from fltk.util.log import FLLogger -from torchsummary import summary -from torch.utils.tensorboard import SummaryWriter -from pathlib import Path -import logging -import numpy as np -import copy - -# from fltk.util.profile_plots import stability_plot, parse_stability_data -from fltk.util.results import EpochData -from fltk.util.tensor_converter import convert_distributed_data_into_numpy - -logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', -) - - -def _call_method(method, rref, *args, **kwargs): - return method(rref.local_value(), *args, **kwargs) - - -def _call_method_2(method, rref, *args, **kwargs): - print(method) - return method(rref, *args, **kwargs) - -def _remote_method(method, rref, *args, **kwargs): - args = [method, rref] + list(args) - return rpc.rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs) - - -def _remote_method_async(method, rref, *args, **kwargs) -> torch.Future: - args = [method, rref] + list(args) - return rpc.rpc_async(rref.owner(), _call_method, args=args, kwargs=kwargs) - - -def _remote_method_async_by_name(method, client_name, *args, **kwargs) -> torch.Future: - args = [method, client_name] + list(args) - print(client_name) - print(_call_method_2) - return rpc.rpc_sync(client_name, _call_method_2, args=args, kwargs=kwargs) - - -class ClientRef: - ref = None - name = "" - data_size = 0 - tb_writer = None - tb_writer_offload = None - available = False - rank=None - - def __init__(self, name, ref, tensorboard_writer, tensorboard_writer_offload, rank): - self.name = name - self.ref = ref - self.tb_writer = tensorboard_writer - self.tb_writer_offload = tensorboard_writer_offload - self.rank = rank - - def __repr__(self): - return self.name - -@dataclass -class ClientResponse: - id: int - client: ClientRef - future: torch.Future - start_time: float = time.time() - end_time: float = 0 - done: bool = False - dropped = True - terminated = False - - def finish(self): - self.end_time = time.time() - self.done = True - self.dropped = False - print(f'>>>> \t\tClient {self.id} has a duration of {self.duration()}') - - def duration(self): - return self.end_time - self.start_time - - -class Federator: - """ - Central component of the Federated Learning System: The Federator - - The Federator is in charge of the following tasks: - - Have a copy of the global model - - Client selection - - Aggregating the client model weights/gradients - - Saving all the metrics - - Use tensorboard to report metrics - - Keep track of timing - - """ - clients: List[ClientRef] = [] - epoch_counter = 0 - client_data = {} - response_list : List[ClientResponse] = [] - response_id = 0 - - reference_lookup = {} - performance_estimate = {} - - # Strategies - deadline_enabled = False - swyh_enabled = False - freeze_layers_enabled = False - offload_enabled = False - dyn_terminate = False - dyn_terminate_swyh = False - warmup_active = False - node_groups = {} - tifl_tier_data = [] - tifl_tier_names = [] - tifl_selected_tier = '' - - exp_start_time = 0 - - strategy = OffloadingStrategy.VANILLA - - - # Keep track of the experiment data - exp_data_general = [] - - epoch_events = [] - - def __init__(self, client_id_triple, num_epochs = 3, config=None): - log_rref = rpc.RRef(FLLogger()) - self.log_rref = log_rref - self.num_epoch = num_epochs - self.config = config - self.tb_path = f'{config.output_location}/{config.experiment_prefix}' - self.ensure_path_exists(self.tb_path) - self.tb_writer = SummaryWriter(f'{self.tb_path}/{config.experiment_prefix}_federator') - self.strategy = OffloadingStrategy.Parse(config.offload_strategy) - self.configure_strategy(self.strategy) - self.create_clients(client_id_triple) - self.config.init_logger(logging) - self.performance_data = {} - - logging.info("Creating test client") - copy_sampler = config.data_sampler - config.data_sampler = "uniform" - self.test_data = Client("test", None, 1, 2, config) - config.data_sampler = copy_sampler - self.reference_lookup[get_worker_info().name] = RRef(self) - - if self.strategy == OffloadingStrategy.TIFL_BASIC or self.strategy == OffloadingStrategy.TIFL_ADAPTIVE: - for k, v in self.config.node_groups.items(): - self.node_groups[k] = list(range(v[0], v[1]+1)) - self.tifl_tier_names.append(k) - - if self.strategy == OffloadingStrategy.TIFL_ADAPTIVE: - num_tiers = len(self.tifl_tier_names) * 1.0 - start_credits = np.ceil(self.config.epochs / num_tiers) - logging.info(f'Tifl starting credits is {start_credits}') - for tier_name in self.tifl_tier_names: - self.tifl_tier_data.append([tier_name, 0, start_credits, 1 / num_tiers]) - residue = 1 - for t in self.tifl_tier_data: - residue -= t[3] - self.tifl_tier_data[0][3] += residue - - # def configure_strategy(self, strategy : OffloadingStrategy): - # if strategy == OffloadingStrategy.VANILLA: - # logging.info('Running with offloading strategy: VANILLA') - # self.deadline_enabled = False - # self.swyh_enabled = False - # self.freeze_layers_enabled = False - # self.offload_enabled = False - # if strategy == OffloadingStrategy.DEADLINE: - # logging.info('Running with offloading strategy: DEADLINE') - # self.deadline_enabled = True - # self.swyh_enabled = False - # self.freeze_layers_enabled = False - # self.offload_enabled = False - # if strategy == OffloadingStrategy.SWYH: - # logging.info('Running with offloading strategy: SWYH') - # self.deadline_enabled = True - # self.swyh_enabled = True - # self.freeze_layers_enabled = False - # self.offload_enabled = False - # if strategy == OffloadingStrategy.FREEZE: - # logging.info('Running with offloading strategy: FREEZE') - # self.deadline_enabled = True - # self.swyh_enabled = False - # self.freeze_layers_enabled = True - # self.offload_enabled = False - # if strategy == OffloadingStrategy.MODEL_OFFLOAD: - # logging.info('Running with offloading strategy: MODEL_OFFLOAD') - # self.deadline_enabled = True - # self.swyh_enabled = False - # self.freeze_layers_enabled = True - # self.offload_enabled = True - # if strategy == OffloadingStrategy.TIFL_BASIC: - # logging.info('Running with offloading strategy: TIFL_BASIC') - # self.deadline_enabled = False - # self.swyh_enabled = False - # self.freeze_layers_enabled = False - # self.offload_enabled = False - # logging.info(f'Offload strategy params: deadline={self.deadline_enabled}, swyh={self.swyh_enabled}, freeze={self.freeze_layers_enabled}, offload={self.offload_enabled}') - # - def configure_strategy(self, strategy : OffloadingStrategy): - deadline_enabled, swyh_enabled, freeze_layers_enabled, offload_enabled, dyn_terminate, dyn_terminate_swyh = parse_strategy(strategy) - self.deadline_enabled = deadline_enabled - self.swyh_enabled = swyh_enabled - self.freeze_layers_enabled = freeze_layers_enabled - self.offload_enabled = offload_enabled - self.dyn_terminate = dyn_terminate - self.dyn_terminate_swyh = dyn_terminate_swyh - logging.info(f'Offloading strategy={strategy}') - logging.info(f'Offload strategy params: deadline={self.deadline_enabled}, ' - f'swyh={self.swyh_enabled}, freeze={self.freeze_layers_enabled}, ' - f'offload={self.offload_enabled}, dyn_terminate={self.dyn_terminate}, ' - f'dyn_terminate_swyh={self.dyn_terminate_swyh}') - - def create_clients(self, client_id_triple): - for id, rank, world_size in client_id_triple: - client = rpc.remote(id, Client, kwargs=dict(id=id, log_rref=self.log_rref, rank=rank, world_size=world_size, config=self.config)) - writer = SummaryWriter(f'{self.tb_path}/{self.config.experiment_prefix}_client_{id}') - writer_offload = None - if self.offload_enabled: - writer_offload = SummaryWriter(f'{self.tb_path}/{self.config.experiment_prefix}_client_{id}_offload') - self.clients.append(ClientRef(id, client, tensorboard_writer=writer, tensorboard_writer_offload=writer_offload, rank=rank)) - self.client_data[id] = [] - - def record_epoch_event(self, event: str): - self.epoch_events.append(f'{time.time()} - [{self.epoch_counter}] - {event}') - - def select_clients(self, n = 2): - available_clients = list(filter(lambda x : x.available, self.clients)) - if self.strategy == OffloadingStrategy.TIFL_ADAPTIVE: - tifl_update_probs(self.tifl_tier_data) - self.tifl_selected_tier = tifl_select_tier_and_decrement(self.tifl_tier_data) - client_subset = self.node_groups[self.tifl_selected_tier] - available_clients = list(filter(lambda x: x.rank in client_subset, self.clients)) - if self.strategy == OffloadingStrategy.TIFL_BASIC: - self.tifl_selected_tier = np.random.choice(list(self.node_groups.keys()), 1, replace=False)[0] - logging.info(f'TIFL: Sampling from group {self.tifl_selected_tier} out of{list(self.node_groups.keys())}') - client_subset = self.node_groups[self.tifl_selected_tier] - available_clients = list(filter(lambda x : x.rank in client_subset, self.clients)) - logging.info(f'TIFL: Sampling subgroup {available_clients}') - return random_selection(available_clients, n) - - def ping_all(self): - for client in self.clients: - logging.info(f'Sending ping to {client}') - t_start = time.time() - answer = _remote_method(Client.ping, client.ref) - t_end = time.time() - duration = (t_end - t_start)*1000 - logging.info(f'Ping to {client} is {duration:.3}ms') - - def rpc_test_all(self): - for client in self.clients: - res = _remote_method_async(Client.rpc_test, client.ref) - while not res.done(): - pass - - def client_load_data(self): - for client in self.clients: - _remote_method_async(Client.init_dataloader, client.ref) - - def clients_ready(self): - all_ready = False - ready_clients = [] - while not all_ready: - responses = [] - for client in self.clients: - if client.name not in ready_clients: - responses.append((client, _remote_method_async(Client.is_ready, client.ref))) - all_ready = True - for res in responses: - result, client_ref = res[1].wait() - if result: - self.reference_lookup[res[0].name] = client_ref - logging.info(f'{res[0]} is ready') - ready_clients.append(res[0]) - # Set the client to available - res[0].available = True - else: - logging.info(f'Waiting for {res[0]}') - all_ready = False - - time.sleep(2) - - # WorkerInfo(id=1, name="client1").local_value() - # rpc.rpc_sync(self.nameclients[0].ref.owner(), Client.ping, args=(self.clients[0].ref)) - logging.info(f'Sending a ping to client {self.clients[0].name}') - r_ref = rpc.remote(self.clients[0].name, Client.static_ping, args=()) - print(f'Result of rref: {r_ref.to_here()}') - logging.info('All clients are ready') - for idx, c in enumerate(self.clients): - logging.info(f'[{idx}]={c}') - - - def perf_metric_endpoint(self, node_id, perf_data): - if node_id not in self.performance_data.keys(): - self.performance_data[node_id] = [] - self.performance_data[node_id].append(perf_data) - - def perf_est_endpoint(self, node_id, performance_data): - logging.info(f'Received performance estimate of node {node_id}') - self.performance_estimate[node_id] = performance_data - - def send_clients_ref(self): - - for c in self.clients: - # _remote_method_async(Client.send_reference, c.ref, rpc.get_worker_info()) - _remote_method_async(Client.send_reference, c.ref, RRef(self)) - - def num_available_clients(self): - return sum(c.available == True for c in self.clients) - - def process_response_list(self): - for resp in self.response_list: - if resp.future.done(): - resp.finish() - resp.client.available = True - self.response_list = list(filter(lambda x: not x.done, self.response_list)) - - def ask_client_to_offload(self, client1_ref, client2_ref, soft_deadline): - logging.info(f'Offloading call from {client1_ref} to {client2_ref}') - # args = [method, rref] + list(args) - # rpc.rpc_sync(client1_ref, Client.call_to_offload_endpoint, args=(client2_ref)) - # print(_remote_method_async_by_name(Client.client_to_offload_to, client1_ref, client2_ref)) - _remote_method(Client.call_to_offload_endpoint, client1_ref, client2_ref, soft_deadline) - logging.info(f'Done with call to offload') - - def remote_run_epoch(self, epochs, warmup=False, first_epoch=False): - if warmup: - logging.info('This is a WARMUP round') - start_epoch_time = time.time() - deadline = self.config.deadline - deadline_time = self.config.deadline - if first_epoch: - deadline = self.config.first_deadline - deadline_time = self.config.first_deadline - """ - 1. Client selection - 2. Run local updates - 3. Retrieve data - 4. Aggregate data - """ - - client_weights = [] - - client_weights_dict = {} - client_training_process_dict = {} - - - self.record_epoch_event('Starting new round') - while self.num_available_clients() < self.config.clients_per_round: - logging.warning(f'Waiting for enough clients to become available. # Available Clients = {self.num_available_clients()}, but need {self.config.clients_per_round}') - self.process_response_list() - time.sleep(1) - - #### Client Selection #### - selected_clients = self.select_clients(self.config.clients_per_round) - - #### Send model to clients #### - responses = [] - for client in selected_clients: - logging.info(f'Send updated model to selected client: {client.name}') - responses.append( - (client, _remote_method_async(Client.update_nn_parameters, client.ref, new_params=self.test_data.get_nn_parameters()))) - - for res in responses: - res[1].wait() - logging.info('Weights are updated') - - ### Clients train locally - # Structure of the async message: - # - Client will respond with two messages: - - # Let clients train locally - - if not self.deadline_enabled: - deadline = 0 - responses: List[ClientResponse] = [] - for client in selected_clients: - cr = ClientResponse(self.response_id, client, _remote_method_async(Client.run_epochs, client.ref, num_epoch=epochs, deadline=deadline, warmup=warmup)) - cr.start_time = time.time() - self.response_id += 1 - self.response_list.append(cr) - responses.append(cr) - client.available = False - # responses.append((client, time.time(), _remote_method_async(Client.run_epochs, client.ref, num_epoch=epochs))) - self.epoch_counter += epochs - - # deadline_time = None - # Wait loop with deadline - start = time.time() - def reached_deadline(): - if deadline_time is None: - return False - # logging.info(f'{(time.time() - start)} >= {deadline_time}') - return (time.time() -start) >= deadline_time - - logging.info('Starting waiting period') - # Wait loop without deadline - all_finished = False - - # Debug for testing! - has_not_called = True - - show_perf_data = True - has_send_terminate = False - while not all_finished and not ((self.deadline_enabled and reached_deadline()) or warmup): - # if self.deadline_enabled and reached_deadline() - # if has_not_called and (time.time() -start) > 10: - # logging.info('Sending call to offload') - # has_not_called = False - # - # self.ask_client_to_offload(self.reference_lookup[selected_clients[0].name], selected_clients[1].name) - - # Check if all performance data has come in - has_all_perf_data = True - - if show_perf_data: - for sc in selected_clients: - if sc.name not in self.performance_estimate.keys(): - has_all_perf_data = False - - if has_all_perf_data: - logging.info('Got all performance data') - print(self.performance_estimate) - show_perf_data = False - - # Make offloading call - # @NOTE: this will only work for the two node scenario - - lowest_est_time = 0 - est_keys = list(self.performance_estimate.keys()) - - # for k, v in self.performance_estimate.items(): - # if v[1] > lowest_est_time: - # lowest_est_time = v[1] - # weak_client = k - # else: - # strong_client = k - - # (time_per_batch, est_total_time, number_of_training_samples) - if self.offload_enabled and not warmup: - first = True - weakest = 0 - strongest = 0 - weak_performance = 0 - strong_performance = 0 - summed_time = 0 - perf_estimate_copy = copy.deepcopy(self.performance_estimate) - offload_calls = [] - for i in range(int(np.floor(len(self.performance_estimate)/2))): - for k, v in perf_estimate_copy.items(): - summed_time += v[1] - # print(v) - if first: - first = False - est_total_time = v[1] - weakest = k - strongest = k - weak_performance = est_total_time - strong_performance = est_total_time - else: - est_total_time = v[1] - if est_total_time > weak_performance: - weak_performance = est_total_time - weakest = k - if est_total_time < strong_performance: - strong_performance = est_total_time - strongest = k - self.record_epoch_event(f'Offloading from {weakest} -> {strongest} due to {self.performance_estimate[weakest]} and {self.performance_estimate[strongest]}') - logging.info( - f'Offloading from {weakest} -> {strongest} due to {self.performance_estimate[weakest]} and {self.performance_estimate[strongest]}') - offload_calls.append([weakest, strongest]) - perf_estimate_copy.pop(weakest, None) - perf_estimate_copy.pop(strongest, None) - mean_time_est_time = (summed_time * 1.0) / len(self.performance_estimate.items()) - logging.info(f'Mean time for offloading={mean_time_est_time}') - logging.info('Sending call to offload') - for weak_node, strong_node in offload_calls: - self.ask_client_to_offload(self.reference_lookup[weak_node], strong_node, mean_time_est_time) - logging.info('Releasing clients') - for client in selected_clients: - _remote_method_async(Client.release_from_offloading_endpoint, client.ref) - - # if self.offload_enabled and not warmup: - # logging.info(f'self.performance_estimate={self.performance_estimate}') - # logging.info(f'est_keys={est_keys}') - # weak_client = est_keys[0] - # strong_client = est_keys[1] - # if self.performance_estimate[est_keys[1]][1] > self.performance_estimate[est_keys[0]][1]: - # weak_client = est_keys[1] - # strong_client = est_keys[0] - # - # logging.info(f'Offloading from {weak_client} -> {strong_client} due to {self.performance_estimate[weak_client]} and {self.performance_estimate[strong_client]}') - # logging.info('Sending call to offload') - # self.ask_client_to_offload(self.reference_lookup[selected_clients[0].name], selected_clients[1].name) - - # selected_clients[0] - # logging.info(f'Status of all_finished={all_finished} and deadline={reached_deadline()}') - all_finished = True - - for client_response in responses: - if client_response.future.done(): - if not client_response.done: - client_response.finish() - else: - all_finished = False - if not has_send_terminate and (self.dyn_terminate or self.dyn_terminate_swyh): - num_finished_responses = sum([1 for x in responses if x.done]) - percentage = num_finished_responses / len(responses) - if percentage > self.config.termination_percentage: - logging.info('Sending termination signal') - for cr in responses: - if not cr.done: - if self.dyn_terminate: - cr.terminated = True - _remote_method_async(Client.terminate_training_endpoint, cr.client.ref) - has_send_terminate = True - logging.info(f'Percentage of finished responses: {percentage}, do terminate ? {percentage} > {self.config.termination_percentage} = {percentage > self.config.termination_percentage}') - time.sleep(0.1) - logging.info(f'Stopped waiting due to all_finished={all_finished} and deadline={reached_deadline()}') - client_accuracies = [] - for client_response in responses: - if warmup: - break - client = client_response.client - logging.info(f'{client} had a exec time of {client_response.duration()} dropped?={client_response.dropped}') - if client_response.dropped: - client_response.end_time = time.time() - logging.info( - f'{client} had a exec time of {client_response.duration()} dropped?={client_response.dropped}') - - if not client_response.dropped and not client_response.terminated: - client.available = True - logging.info(f'Fetching response for client: {client}') - response_obj = client_response.future.wait() - epoch_data : EpochData - epoch_data, weights, scheduler_data, perf_data = response_obj['own'] - epoch_data.global_epoch_id = self.epoch_counter - epoch_data.global_wall_time = client_response.end_time - self.client_data[epoch_data.client_id].append(epoch_data) - - # logging.info(f'{client} had a loss of {epoch_data.loss}') - # logging.info(f'{client} had a epoch data of {epoch_data}') - # logging.info(f'{client} has trained on {epoch_data.training_process} samples') - self.record_epoch_event(f'{client} had an accuracy of {epoch_data.accuracy}') - self.record_epoch_event(f'{client} had an duration of {client_response.duration()}') - client_accuracies.append(epoch_data.accuracy) - # logging.info(f'{client} has perf data: {perf_data}') - elapsed_time = client_response.end_time - self.exp_start_time - - client.tb_writer.add_scalar('training loss', - epoch_data.loss_train, # for every 1000 minibatches - self.epoch_counter * client.data_size) - - client.tb_writer.add_scalar('accuracy', - epoch_data.accuracy, # for every 1000 minibatches - self.epoch_counter * client.data_size) - - client.tb_writer.add_scalar('accuracy wall time', - epoch_data.accuracy, # for every 1000 minibatches - elapsed_time) - client.tb_writer.add_scalar('training loss per epoch', - epoch_data.loss_train, # for every 1000 minibatches - self.epoch_counter) - - client.tb_writer.add_scalar('accuracy per epoch', - epoch_data.accuracy, # for every 1000 minibatches - self.epoch_counter) - - client.tb_writer.add_scalar('Client time per epoch', - client_response.duration(), # for every 1000 minibatches - self.epoch_counter) - - client.tb_writer.add_scalar('learning rate', - scheduler_data['lr'], - self.epoch_counter) - - client.tb_writer.add_scalar('momentum', - scheduler_data['momentum'], - self.epoch_counter) - - client.tb_writer.add_scalar('weight decay', - scheduler_data['wd'], - self.epoch_counter) - total_time_t1 = perf_data['total_duration'] - loop_duration = perf_data['loop_duration'] - p_v1_time = perf_data['p_v1_data'].mean() * perf_data['n_batches'] - p_v1_time_sum = perf_data['p_v1_data'].sum() - p_v1_pre_loop = perf_data['p_v1_pre_loop'] - p_v1_post_loop = perf_data['p_v1_post_loop'] - pre_train_loop_data = perf_data['pre_train_loop_data'] - post_train_loop_data = perf_data['post_train_loop_data'] - p_v2_forwards = (perf_data['p_v2_data'][0].mean() + perf_data['p_v2_data'][1].mean()) * perf_data['n_batches'] - p_v2_backwards = (perf_data['p_v2_data'][2].mean() + perf_data['p_v2_data'][3].mean()) * perf_data['n_batches'] - p_v3_forwards = (perf_data['p_v3_data'][0].mean() + perf_data['p_v3_data'][1].mean()) * perf_data[ - 'n_batches'] - p_v3_backwards = (perf_data['p_v3_data'][2].mean() + perf_data['p_v3_data'][3].mean()) * perf_data[ - 'n_batches'] - p_v2_time = sum([x.mean() for x in perf_data['p_v2_data']]) * perf_data['n_batches'] - p_v1_forwards = perf_data['p_v1_forwards'].mean() * perf_data['n_batches'] - p_v1_backwards = perf_data['p_v1_backwards'].mean() * perf_data['n_batches'] - - # logging.info(f'{client} has time estimates: {[total_time_t1, loop_duration, p_v1_time_sum, p_v1_time, p_v2_time, [p_v1_forwards, p_v1_backwards], [p_v2_forwards, p_v2_backwards]]}') - # logging.info(f'{client} combined times pre post loop stuff: {[p_v1_pre_loop, loop_duration, p_v1_post_loop]} = {sum([p_v1_pre_loop, loop_duration, p_v1_post_loop])} ? {total_time_t1}') - # logging.info(f'{client} p3 time = {p_v3_forwards} + {p_v3_backwards} = {p_v3_forwards+ p_v3_backwards}') - # logging.info(f'{client} Pre train loop time = {pre_train_loop_data.mean()}, post train loop time = {post_train_loop_data.mean()}') - # logging.info(f'{client} p_v1 data: {perf_data["p_v1_data"]}') - - - - client.tb_writer.add_scalar('train_time_estimate_delta', loop_duration - (p_v3_forwards+ p_v3_backwards), self.epoch_counter) - client.tb_writer.add_scalar('train_time_estimate_delta_2', loop_duration - (p_v2_forwards+ p_v2_backwards), self.epoch_counter) - - client_weights.append(weights) - client_weights_dict[client.name] = weights - client_training_process_dict[client.name] = epoch_data.training_process - - if self.strategy == OffloadingStrategy.TIFL_ADAPTIVE: - mean_tier_accuracy = np.mean(client_accuracies) - logging.info(f'TIFL:: the mean accuracy is {mean_tier_accuracy}') - for t in self.tifl_tier_data: - if t[0] == self.tifl_selected_tier: - t[1] = mean_tier_accuracy - - if 'offload' in response_obj: - epoch_data_offload, weights_offload, scheduler_data_offload, perf_data_offload, sender_id = response_obj['offload'] - if epoch_data_offload.client_id not in self.client_data: - self.client_data[epoch_data_offload.client_id] = [] - epoch_data_offload.global_epoch_id = self.epoch_counter - epoch_data_offload.global_wall_time = client_response.end_time - self.client_data[epoch_data_offload.client_id].append(epoch_data_offload) - - writer = client.tb_writer_offload - - writer.add_scalar('training loss', - epoch_data_offload.loss_train, # for every 1000 minibatches - self.epoch_counter * client.data_size) - - writer.add_scalar('accuracy', - epoch_data_offload.accuracy, # for every 1000 minibatches - self.epoch_counter * client.data_size) - - writer.add_scalar('accuracy wall time', - epoch_data_offload.accuracy, # for every 1000 minibatches - elapsed_time) - writer.add_scalar('training loss per epoch', - epoch_data_offload.loss_train, # for every 1000 minibatches - self.epoch_counter) - - writer.add_scalar('accuracy per epoch', - epoch_data_offload.accuracy, # for every 1000 minibatches - self.epoch_counter) - - writer.add_scalar('Client time per epoch', - client_response.duration(), # for every 1000 minibatches - self.epoch_counter) - - writer.add_scalar('learning rate', - scheduler_data_offload['lr'], - self.epoch_counter) - - writer.add_scalar('momentum', - scheduler_data_offload['momentum'], - self.epoch_counter) - - writer.add_scalar('weight decay', - scheduler_data_offload['wd'], - self.epoch_counter) - client_weights.append(weights_offload) - client_weights_dict[epoch_data_offload.client_id] = weights_offload - client_training_process_dict[epoch_data_offload.client_id] = epoch_data_offload.training_process - - self.performance_estimate = {} - if len(client_weights): - logging.info(f'Aggregating {len(client_weights)} models') - updated_model = FedAvg(client_weights_dict, client_training_process_dict) - # updated_model = average_nn_parameters(client_weights) - - # test global model - logging.info("Testing on global test set") - self.test_data.update_nn_parameters(updated_model) - accuracy, loss, class_precision, class_recall, accuracy_per_class = self.test_data.test() - # logging.info('Class precision') - # logging.warning(accuracy_per_class) - # logging.info('Class names') - # logging.info(self.test_data.dataset.test_dataset.class_to_idx) - # self.tb_writer.add_scalar('training loss', loss, self.epoch_counter * self.test_data.get_client_datasize()) # does not seem to work :( ) - self.tb_writer.add_scalar('Number of clients dropped', sum([1 for x in responses if x.dropped or x.terminated]), self.epoch_counter) - - self.tb_writer.add_scalar('accuracy', accuracy, self.epoch_counter * self.test_data.get_client_datasize()) - self.record_epoch_event(f'Global accuracy is {accuracy}') - self.tb_writer.add_scalar('accuracy per epoch', accuracy, self.epoch_counter) - elapsed_time = time.time() - self.exp_start_time - self.tb_writer.add_scalar('accuracy wall time', - accuracy, # for every 1000 minibatches - elapsed_time) - - class_acc_dict = {} - for idx, acc in enumerate(accuracy_per_class): - class_acc_dict[f'{idx}'] = acc - self.tb_writer.add_scalars('accuracy per class', class_acc_dict, self.epoch_counter) - self.record_epoch_event(f'Accuracy per class is {class_acc_dict}') - end_epoch_time = time.time() - duration = end_epoch_time - start_epoch_time - - - self.exp_data_general.append([self.epoch_counter, end_epoch_time, duration, accuracy, loss, class_precision, class_recall]) - - - def set_tau_eff(self): - total = sum(client.data_size for client in self.clients) - responses = [] - for client in self.clients: - responses.append((client, _remote_method_async(Client.set_tau_eff, client.ref, total))) - torch.futures.wait_all([x[1] for x in responses]) - # for client in self.clients: - # client.set_tau_eff(total) - - def save_experiment_data(self): - p = Path(f'./{self.tb_path}') - # file_output = f'./{self.config.output_location}' - exp_prefix = self.config.experiment_prefix - self.ensure_path_exists(p) - p /= f'{exp_prefix}-general_data.csv' - # general_filename = f'{file_output}/general_data.csv' - df = pd.DataFrame(self.exp_data_general, columns=['epoch', 'wall_time', 'duration', 'accuracy', 'loss', 'class_precision', 'class_recall']) - df.to_csv(p) - - def update_client_data_sizes(self): - responses = [] - for client in self.clients: - responses.append((client, _remote_method_async(Client.get_client_datasize, client.ref))) - for res in responses: - res[0].data_size = res[1].wait() - logging.info(f'{res[0]} had a result of datasize={res[0].data_size}') - # @TODO: Use datasize in aggregation method - - def remote_test_sync(self): - responses = [] - for client in self.clients: - responses.append((client, _remote_method_async(Client.test, client.ref))) - - for res in responses: - accuracy, loss, class_precision, class_recall = res[1].wait() - logging.info(f'{res[0]} had a result of accuracy={accuracy}') - - def flush_epoch_events(self): - file_output = f'./{self.tb_path}' - exp_prefix = self.config.experiment_prefix - file_epoch_events = f'{file_output}/{exp_prefix}_federator_events.txt' - self.ensure_path_exists(file_output) - - with open(file_epoch_events, 'a') as f: - for ev in self.epoch_events: - f.write(f'{ev}\n') - f.flush() - - self.epoch_events = [] - - def save_epoch_data(self): - file_output = f'./{self.tb_path}' - exp_prefix = self.config.experiment_prefix - self.ensure_path_exists(file_output) - for key in self.client_data: - filename = f'{file_output}/{exp_prefix}_{key}_epochs.csv' - logging.info(f'Saving data at {filename}') - with open(filename, "w") as f: - w = DataclassWriter(f, self.client_data[key], EpochData) - w.write() - - def ensure_path_exists(self, path): - Path(path).mkdir(parents=True, exist_ok=True) - - - def run(self): - """ - Main loop of the Federator - :return: - - - - Steps in federated learning process - - 1. Client selection - 2. Run local updates - 3. Retrieve data - 4. Aggregate data - """ - # # Make sure the clients have loaded all the data - self.send_clients_ref() - self.client_load_data() - self.test_data.init_dataloader() - self.ping_all() - self.clients_ready() - self.update_client_data_sizes() - self.set_tau_eff() - - epoch_to_run = self.num_epoch - addition = 0 - epoch_to_run = self.config.epochs - epoch_size = self.config.epochs_per_cycle - - if self.config.warmup_round: - logging.info('Running warmup round') - self.remote_run_epoch(epoch_size, warmup=True) - - self.exp_start_time = time.time() - for epoch in range(epoch_to_run): - self.process_response_list() - logging.info(f'Running epoch {epoch}') - self.remote_run_epoch(epoch_size) - self.flush_epoch_events() - addition += 1 - - - logging.info(f'Saving data') - self.save_epoch_data() - self.save_experiment_data() - - # Ignore profiler for now - # logging.info(f'Reporting profile data') - # for key in self.performance_data.keys(): - # parse_stability_data(self.performance_data[key], save_to_file=True) - logging.info(f'Federator is stopping') - diff --git a/fltk/launch.py b/fltk/launch.py deleted file mode 100644 index 8e5c783a..00000000 --- a/fltk/launch.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -import sys -import torch.distributed.rpc as rpc -import logging - -import yaml -import argparse - -import torch.multiprocessing as mp -from fltk.federator import Federator -from fltk.util.base_config import BareConfig - -logging.basicConfig(level=logging.DEBUG) - - -def run_ps(rpc_ids_triple, args): - print(f'Starting the federator...') - fed = Federator(rpc_ids_triple, config=args) - fed.run() - -def run_single(rank, world_size, host = None, args = None, nic = None): - logging.info(f'Starting with rank={rank} and world size={world_size}') - if host: - os.environ['MASTER_ADDR'] = host - else: - os.environ['MASTER_ADDR'] = '0.0.0.0' - os.environ['MASTER_PORT'] = '5000' - if nic: - os.environ['GLOO_SOCKET_IFNAME'] = nic - os.environ['TP_SOCKET_IFNAME'] = nic - else: - os.environ['GLOO_SOCKET_IFNAME'] = 'eth0' - os.environ['TP_SOCKET_IFNAME'] = 'eth0' - logging.info(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]}') - options = rpc.TensorPipeRpcBackendOptions( - num_worker_threads=16, - rpc_timeout=0, # infinite timeout - init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}' - ) - - if rank != 0: - logging.info(f'Starting worker {rank}') - rpc.init_rpc( - f"client{rank}", - rank=rank, - world_size=world_size, - rpc_backend_options=options, - ) - # trainer passively waiting for ps to kick off training iterations - else: - logging.info('Starting the ps') - rpc.init_rpc( - "ps", - rank=rank, - world_size=world_size, - rpc_backend_options=options - - ) - run_ps([(f"client{r}", r, world_size) for r in range(1, world_size)], args) - # block until all rpc finish - rpc.shutdown() - - -def run_spawn(config): - world_size = config.world_size - master_address = config.federator_host - mp.spawn( - run_single, - args=(world_size, master_address, config), - nprocs=world_size, - join=True - ) \ No newline at end of file diff --git a/fltk/util/config.py b/fltk/util/config.py index 9b262606..5682059a 100644 --- a/fltk/util/config.py +++ b/fltk/util/config.py @@ -6,6 +6,7 @@ import torch import yaml +from fltk.util.log import getLogger from fltk.util.definitions import Dataset, Nets, DataSampler, Optimizations, LogLevel, Aggregations @@ -25,6 +26,9 @@ class Config: scheduler_step_size: int = 50 scheduler_gamma: float = 0.5 min_lr: float = 1e-10 + + # @TODO: Set seed from configuration + rng_seed = 0 # Enum optimizer: Optimizations = Optimizations.sgd optimizer_args = { @@ -73,7 +77,11 @@ def __init__(self, **kwargs) -> None: self.__setattr__(name, value) if name == 'output_location': self.output_path = Path(value) + self.update_rng_seed() + + def update_rng_seed(self): + torch.manual_seed(self.rng_seed) def get_default_model_folder_path(self): return self.default_model_folder_path @@ -100,9 +108,9 @@ def get_loss_function(self): @classmethod def FromYamlFile(cls, path: Path): - print(f'Loading yaml from {path.absolute()}') + getLogger(__name__).debug(f'Loading yaml from {path.absolute()}') with open(path) as file: content = yaml.safe_load(file) for k, v in content.items(): - print(f'Inserting key "{k}" into config') + getLogger(__name__).debug(f'Inserting key "{k}" into config') return cls(**content) diff --git a/fltk/util/fed_avg.py b/fltk/util/fed_avg.py deleted file mode 100644 index e60d1684..00000000 --- a/fltk/util/fed_avg.py +++ /dev/null @@ -1,12 +0,0 @@ -def average_nn_parameters(parameters): - """ - Averages passed parameters. - - :param parameters: nn model named parameters - :type parameters: list - """ - new_params = {} - for name in parameters[0].keys(): - new_params[name] = sum([param[name].data for param in parameters]) / len(parameters) - - return new_params diff --git a/fltk/util/generate_docker_compose_2.py b/fltk/util/generate_docker_compose_2.py index a35bd43d..7185fa01 100644 --- a/fltk/util/generate_docker_compose_2.py +++ b/fltk/util/generate_docker_compose_2.py @@ -1,7 +1,5 @@ import copy from pathlib import Path -from pprint import pprint - import yaml import numpy as np @@ -10,6 +8,7 @@ def load_yaml_file(file_path: Path): with open(file_path) as file: return yaml.full_load(file) + def generate_client(id, template: dict, world_size: int, type='default', cpu_set=None, num_cpus=1): local_template = copy.deepcopy(template) key_name = list(local_template.keys())[0] @@ -20,10 +19,6 @@ def generate_client(id, template: dict, world_size: int, type='default', cpu_set local_template[container_name]['environment'][key] = item.format(rank=id) if item == 'WORLD_SIZE={world_size}': local_template[container_name]['environment'][key] = item.format(world_size=world_size) - # for key, item in enumerate(local_template[container_name]): - # if item == 'cpuset: {cpu_set}': - # local_template[container_name][key] = item.format(cpu_set=cpu_set) - local_template[container_name]['ports'] = [f'{5000+id}:5000'] if cpu_set: local_template[container_name]['cpuset'] = f'{cpu_set}' @@ -32,6 +27,7 @@ def generate_client(id, template: dict, world_size: int, type='default', cpu_set local_template[container_name]['deploy']['resources']['limits']['cpus'] = f'{num_cpus}' return local_template, container_name + def gen_client(name: str, client_dict: dict, base_path: Path): """ rank (id) @@ -56,27 +52,22 @@ def gen_client(name: str, client_dict: dict, base_path: Path): if client_dict['pin-cores'] is True: client_descr_template['num_cores'] = client_dict['num-cores'] client_descr_template['stub-file'] = client_dict['stub-name'] - # print(name) - # pprint(stub_data) client_cpu_speeds = np.abs(np.round(np.random.normal(mu, sigma, size=n), 2)) client_descriptions = [] for cpu_speed in client_cpu_speeds: client_descr = copy.deepcopy(client_descr_template) client_descr['num_cpu'] = cpu_speed client_descriptions.append(client_descr) - # client_data = copy.deepcopy(client_dict) - # client_data.pop('cpu-variation') - # print(cpu_speed) - # print(np.random.normal(mu, sigma, size=n)) - # for k, v in client_dict.items(): - # print(k) return client_descriptions + + def generate_clients_proporties(clients_dict: dict, path: Path): results = [] for k,v in clients_dict.items(): results += gen_client(k, v, path) return results + def generate_compose_file(path: Path): """ Used properties: @@ -85,32 +76,7 @@ def generate_compose_file(path: Path): - path to deploy files - random seed? """ - # system = { - # - # 'federator': { - # 'stub-name': 'system_stub.yml', - # 'pin-cores': True, - # 'num-cores': 1 - # }, - # 'clients': { - # 'fast': { - # 'stub-name': 'stub_default.yml', - # 'amount': 1, - # 'pin-cores': True, - # 'num-cores': 3, - # 'cpu-speed': 3, - # 'cpu-variation': 0 - # }, - # 'slow': { - # 'stub-name': 'stub_default.yml', - # 'amount': 0, - # 'pin-cores': True, - # 'num-cores': 1, - # 'cpu-speed': 1, - # 'cpu-variation': 0 - # } - # } - # } + system_path = path / 'description.yml' system = load_yaml_file(system_path) # path = Path('deploy/dev_generate') @@ -136,7 +102,6 @@ def generate_compose_file(path: Path): last_core_id += amount else: system_template['services']['fl_server'].pop('cpuset') - for idx, client_d in enumerate(client_descriptions): stub_file = path / client_d['stub-file'] stub_data = load_yaml_file(stub_file) @@ -151,14 +116,11 @@ def generate_compose_file(path: Path): local_template, container_name = generate_client(idx + 1, stub_data, world_size, client_d['name'], cpu_set, client_d['num_cpu']) system_template['services'].update(local_template) print(container_name) - with open(r'./docker-compose.yml', 'w') as file: yaml.dump(system_template, file, sort_keys=False) - if __name__ == '__main__': - path = Path('deploy/dev_generate') results = generate_compose_file(path) print('done') \ No newline at end of file