From bb505fcab628e3aafd214db30ec761e8cfc522d2 Mon Sep 17 00:00:00 2001 From: tdye24 <18365225454@163.com> Date: Sat, 6 Mar 2021 03:28:38 +0000 Subject: [PATCH 1/5] fix a bug --- models/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/client.py b/models/client.py index 09eda5b3..7e84792d 100644 --- a/models/client.py +++ b/models/client.py @@ -32,7 +32,7 @@ def train(self, num_epochs=1, batch_size=10, minibatch=None): frac = min(1.0, minibatch) num_data = max(1, int(frac*len(self.train_data["x"]))) xs, ys = zip(*random.sample(list(zip(self.train_data["x"], self.train_data["y"])), num_data)) - data = {'x': xs, 'y': ys} + data = {'x': list(xs), 'y': list(ys)} # Minibatch trains for only 1 epoch - multiple local epochs don't make sense! num_epochs = 1 From 7eebb6e9a5882e115c511904adf18d907d167427 Mon Sep 17 00:00:00 2001 From: tdye24 <18365225454@163.com> Date: Mon, 8 Mar 2021 15:13:41 +0000 Subject: [PATCH 2/5] create unique model for per client --- models/client.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/models/client.py b/models/client.py index 7e84792d..0f6ed8eb 100644 --- a/models/client.py +++ b/models/client.py @@ -1,10 +1,16 @@ import random import warnings +import importlib class Client: - - def __init__(self, client_id, group=None, train_data={'x' : [],'y' : []}, eval_data={'x' : [],'y' : []}, model=None): + + def __init__(self, client_id, group=None, train_data={'x': [], 'y': []}, eval_data={'x': [], 'y': []}, model=None): + model_path = 'femnist.cnn' + mod = importlib.import_module(model_path) + ClientModel = getattr(mod, 'ClientModel') + model = ClientModel(123, *(0.06, 62)) + self._model = model self.id = client_id self.group = group @@ -30,12 +36,13 @@ def train(self, num_epochs=1, batch_size=10, minibatch=None): comp, update = self.model.train(data, num_epochs, batch_size) else: frac = min(1.0, minibatch) - num_data = max(1, int(frac*len(self.train_data["x"]))) + num_data = max(1, int(frac * len(self.train_data["x"]))) xs, ys = zip(*random.sample(list(zip(self.train_data["x"], self.train_data["y"])), num_data)) data = {'x': list(xs), 'y': list(ys)} # Minibatch trains for only 1 epoch - multiple local epochs don't make sense! num_epochs = 1 + print(id(self.model)) comp, update = self.model.train(data, num_epochs, num_data) num_train_samples = len(data['y']) return comp, num_train_samples, update @@ -88,8 +95,8 @@ def num_samples(self): if self.train_data is not None: train_size = len(self.train_data['y']) - test_size = 0 - if self.eval_data is not None: + test_size = 0 + if self.eval_data is not None: test_size = len(self.eval_data['y']) return train_size + test_size From 5ddf6a1b186b12ef6af552ef00f27efe27b531cc Mon Sep 17 00:00:00 2001 From: tdye24 <18365225454@163.com> Date: Mon, 8 Mar 2021 15:14:35 +0000 Subject: [PATCH 3/5] add create_fedsp_model function --- models/model.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/models/model.py b/models/model.py index 5d81e136..85af102b 100644 --- a/models/model.py +++ b/models/model.py @@ -20,11 +20,14 @@ def __init__(self, seed, lr, optimizer=None): self._optimizer = optimizer self.graph = tf.Graph() + + self.sess = tf.Session(graph=self.graph) + with self.graph.as_default(): tf.set_random_seed(123 + self.seed) - self.features, self.labels, self.train_op, self.eval_metric_ops, self.loss = self.create_model() + # self.features, self.labels, self.train_op, self.eval_metric_ops, self.loss = self.create_model() + self.features, self.labels, self.train_op, self.eval_metric_ops, self.loss = self.create_fedsp_model() self.saver = tf.train.Saver() - self.sess = tf.Session(graph=self.graph) self.size = graph_size(self.graph) @@ -43,6 +46,15 @@ def set_params(self, model_params): for variable, value in zip(all_vars, model_params): variable.load(value, self.sess) + # todo (tdye) + # model_params是全部参数,只加载global encoder那部分的参数 + def set_global_params(self, model_params): + with self.graph.as_default(): + all_vars = tf.trainable_variables() + for variable, value in zip(all_vars, model_params): + if variable.name.startswith('global'): + variable.load(value, self.sess) + def get_params(self): with self.graph.as_default(): model_params = self.sess.run(tf.trainable_variables()) @@ -71,6 +83,10 @@ def create_model(self): """ return None, None, None, None, None + @staticmethod + def create_fedsp_model(self): + return None, None, None, None, None + def train(self, data, num_epochs=1, batch_size=10): """ Trains the client model. @@ -88,22 +104,22 @@ def train(self, data, num_epochs=1, batch_size=10): self.run_epoch(data, batch_size) update = self.get_params() - comp = num_epochs * (len(data['y'])//batch_size) * batch_size * self.flops + comp = num_epochs * (len(data['y']) // batch_size) * batch_size * self.flops return comp, update + # self.features 和 self.labels 是 placeholder def run_epoch(self, data, batch_size): - + # batch_data没有起到打乱数据的作用,seed应该加上轮数,或者其他 for batched_x, batched_y in batch_data(data, batch_size, seed=self.seed): - input_data = self.process_x(batched_x) target_data = self.process_y(batched_y) - + with self.graph.as_default(): self.sess.run(self.train_op, - feed_dict={ - self.features: input_data, - self.labels: target_data - }) + feed_dict={ + self.features: input_data, + self.labels: target_data + }) def test(self, data): """ @@ -124,6 +140,7 @@ def test(self, data): acc = float(tot_acc) / x_vecs.shape[0] return {ACCURACY_KEY: acc, 'loss': loss} + # todo session关闭后所有的变量都会消失,所以不会导致GPU内存一直上升? def close(self): self.sess.close() @@ -150,6 +167,7 @@ def size(self): def cur_model(self): return self.model + # todo 修改下发参数的方式 def send_to(self, clients): """Copies server model variables to each of the given clients From 87e0f14d53a98778e4ca443ec86d84b7c2b9ff4e Mon Sep 17 00:00:00 2001 From: tdye24 <18365225454@163.com> Date: Mon, 8 Mar 2021 15:16:35 +0000 Subject: [PATCH 4/5] download global params rather than all the params --- models/server.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/models/server.py b/models/server.py index 6d01f357..52d488a3 100644 --- a/models/server.py +++ b/models/server.py @@ -2,8 +2,9 @@ from baseline_constants import BYTES_WRITTEN_KEY, BYTES_READ_KEY, LOCAL_COMPUTATIONS_KEY + class Server: - + def __init__(self, client_model): self.client_model = client_model self.model = client_model.get_params() @@ -17,12 +18,14 @@ def select_clients(self, my_round, possible_clients, num_clients=20): min(num_clients, len(possible_clients)). Args: + my_round: 当前轮数 possible_clients: Clients from which the server can select. num_clients: Number of clients to select; default 20 Return: list of (num_train_samples, num_test_samples) """ num_clients = min(num_clients, len(possible_clients)) + # 为了复现实验,每轮对clients采样的随机数采用轮数 np.random.seed(my_round) self.selected_clients = np.random.choice(possible_clients, num_clients, replace=False) @@ -56,7 +59,13 @@ def train_model(self, num_epochs=1, batch_size=10, minibatch=None, clients=None) BYTES_READ_KEY: 0, LOCAL_COMPUTATIONS_KEY: 0} for c in clients} for c in clients: - c.model.set_params(self.model) + # 下发global model的参数 + # 新加一个函数 c.model.set_global_feature_params + + # FedAvg + # c.model.set_params(self.model) + # FedSP + c.model.set_global_params(self.model) comp, num_samples, update = c.train(num_epochs, batch_size, minibatch) sys_metrics[c.id][BYTES_READ_KEY] += c.model.size @@ -68,6 +77,7 @@ def train_model(self, num_epochs=1, batch_size=10, minibatch=None, clients=None) return sys_metrics def update_model(self): + # 联邦平均 total_weight = 0. base = [0] * len(self.updates[0][1]) for (client_samples, client_model) in self.updates: @@ -97,7 +107,7 @@ def test_model(self, clients_to_test, set_to_use='test'): client.model.set_params(self.model) c_metrics = client.test(set_to_use) metrics[client.id] = c_metrics - + return metrics def get_clients_info(self, clients): @@ -120,8 +130,8 @@ def save_model(self, path): """Saves the server model on checkpoints/dataset/model.ckpt.""" # Save server model self.client_model.set_params(self.model) - model_sess = self.client_model.sess + model_sess = self.client_model.sess return self.client_model.saver.save(model_sess, path) def close_model(self): - self.client_model.close() \ No newline at end of file + self.client_model.close() From d4d66245a3e1e59c95e8bb477aa718632f7a6a0e Mon Sep 17 00:00:00 2001 From: tdye24 <18365225454@163.com> Date: Wed, 26 May 2021 11:09:30 +0000 Subject: [PATCH 5/5] each client owns a model storing in an unique address --- models/client.py | 14 ++++-- models/femnist/cnn.py | 100 +++++++++++++++++++++++++++++++++++------- models/main.py | 65 +++++++++++++++++---------- models/server.py | 10 ++++- 4 files changed, 145 insertions(+), 44 deletions(-) diff --git a/models/client.py b/models/client.py index 0f6ed8eb..7e8d9a5a 100644 --- a/models/client.py +++ b/models/client.py @@ -1,15 +1,23 @@ import random import warnings import importlib +import copy class Client: - def __init__(self, client_id, group=None, train_data={'x': [], 'y': []}, eval_data={'x': [], 'y': []}, model=None): - model_path = 'femnist.cnn' + def __init__(self, client_id, group=None, train_data={'x': [], 'y': []}, eval_data={'x': [], 'y': []}, + model_info=None): + model_path = model_info['model_path'] + seed = model_info['seed'] + model_params = model_info['model_params'] mod = importlib.import_module(model_path) ClientModel = getattr(mod, 'ClientModel') - model = ClientModel(123, *(0.06, 62)) + model = ClientModel(seed, *model_params) + # model_path = 'femnist.cnn' + # mod = importlib.import_module(model_path) + # ClientModel = getattr(mod, 'ClientModel') + # model = ClientModel(123, *(0.06, 62)) self._model = model self.id = client_id diff --git a/models/femnist/cnn.py b/models/femnist/cnn.py index 82eafa70..f0d5fd5d 100644 --- a/models/femnist/cnn.py +++ b/models/femnist/cnn.py @@ -14,41 +14,107 @@ def __init__(self, seed, lr, num_classes): def create_model(self): """Model function for CNN.""" + with tf.device('/gpu:0'): + features = tf.placeholder( + tf.float32, shape=[None, IMAGE_SIZE * IMAGE_SIZE], name='features') + labels = tf.placeholder(tf.int64, shape=[None], name='labels') + input_layer = tf.reshape(features, [-1, IMAGE_SIZE, IMAGE_SIZE, 1]) + conv1 = tf.layers.conv2d( + inputs=input_layer, + filters=32, + kernel_size=[5, 5], + padding="same", + activation=tf.nn.relu) + pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) + conv2 = tf.layers.conv2d( + inputs=pool1, + filters=64, + kernel_size=[5, 5], + padding="same", + activation=tf.nn.relu) + pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) + pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) + dense = tf.layers.dense(inputs=pool2_flat, units=2048, activation=tf.nn.relu) + logits = tf.layers.dense(inputs=dense, units=self.num_classes) + predictions = { + "classes": tf.argmax(input=logits, axis=1), + "probabilities": tf.nn.softmax(logits, name="softmax_tensor") + } + loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits) + # TODO: Confirm that opt initialized once is ok? + train_op = self.optimizer.minimize( + loss=loss, + global_step=tf.train.get_global_step()) + eval_metric_ops = tf.count_nonzero(tf.equal(labels, predictions["classes"])) + return features, labels, train_op, eval_metric_ops, loss + + # todo fedsp + def create_fedsp_model(self): + """Model function for FedSP-CNN.""" features = tf.placeholder( tf.float32, shape=[None, IMAGE_SIZE * IMAGE_SIZE], name='features') labels = tf.placeholder(tf.int64, shape=[None], name='labels') input_layer = tf.reshape(features, [-1, IMAGE_SIZE, IMAGE_SIZE, 1]) - conv1 = tf.layers.conv2d( - inputs=input_layer, - filters=32, - kernel_size=[5, 5], - padding="same", - activation=tf.nn.relu) - pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) - conv2 = tf.layers.conv2d( - inputs=pool1, + # global encoder + global_conv1 = tf.layers.conv2d( + inputs=input_layer, + filters=32, + kernel_size=[5, 5], + padding="same", + activation=tf.nn.relu, name='global_conv1') + global_pool1 = tf.layers.max_pooling2d(inputs=global_conv1, pool_size=[2, 2], strides=2) + global_conv2 = tf.layers.conv2d( + inputs=global_pool1, + filters=64, + kernel_size=[5, 5], + padding="same", + activation=tf.nn.relu, name='global_conv2') + global_pool2 = tf.layers.max_pooling2d(inputs=global_conv2, pool_size=[2, 2], strides=2) + global_pool2_flat = tf.reshape(global_pool2, [-1, 7 * 7 * 64]) + + # local encoder + local_conv1 = tf.layers.conv2d( + inputs=input_layer, + filters=32, + kernel_size=[5, 5], + padding="same", + activation=tf.nn.relu, name='local_conv1') + local_pool1 = tf.layers.max_pooling2d(inputs=local_conv1, pool_size=[2, 2], strides=2) + local_conv2 = tf.layers.conv2d( + inputs=local_pool1, filters=64, kernel_size=[5, 5], padding="same", - activation=tf.nn.relu) - pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) - pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) - dense = tf.layers.dense(inputs=pool2_flat, units=2048, activation=tf.nn.relu) + activation=tf.nn.relu, name='local_conv2') + local_pool2 = tf.layers.max_pooling2d(inputs=local_conv2, pool_size=[2, 2], strides=2) + local_pool2_flat = tf.reshape(local_pool2, [-1, 7 * 7 * 64]) + + concat_res = tf.concat([global_pool2_flat, local_pool2_flat], 1) + + dense = tf.layers.dense(inputs=concat_res, units=2048, activation=tf.nn.relu) + logits = tf.layers.dense(inputs=dense, units=self.num_classes) + predictions = { - "classes": tf.argmax(input=logits, axis=1), - "probabilities": tf.nn.softmax(logits, name="softmax_tensor") + "classes": tf.argmax(input=logits, axis=1), + "probabilities": tf.nn.softmax(logits, name="softmax_tensor") } + loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits) + # TODO: Confirm that opt initialized once is ok? train_op = self.optimizer.minimize( loss=loss, global_step=tf.train.get_global_step()) + eval_metric_ops = tf.count_nonzero(tf.equal(labels, predictions["classes"])) + return features, labels, train_op, eval_metric_ops, loss - def process_x(self, raw_x_batch): + @staticmethod + def process_x(raw_x_batch): return np.array(raw_x_batch) - def process_y(self, raw_y_batch): + @staticmethod + def process_y(raw_y_batch): return np.array(raw_y_batch) diff --git a/models/main.py b/models/main.py index f2bb8461..be32e537 100644 --- a/models/main.py +++ b/models/main.py @@ -5,6 +5,7 @@ import os import sys import random +import copy import tensorflow as tf import metrics.writer as metrics_writer @@ -17,11 +18,11 @@ from utils.args import parse_args from utils.model_utils import read_data -STAT_METRICS_PATH = 'metrics/stat_metrics.csv' -SYS_METRICS_PATH = 'metrics/sys_metrics.csv' +STAT_METRICS_PATH = 'metrics/metrics_stat.csv' +SYS_METRICS_PATH = 'metrics/metrics_sys.csv' -def main(): +def main(): args = parse_args() # Set the random seed if provided (affects client sampling, and batching) @@ -33,10 +34,14 @@ def main(): if not os.path.exists(model_path): print('Please specify a valid dataset and a valid model.') model_path = '%s.%s' % (args.dataset, args.model) - + print('############################## %s ##############################' % model_path) - mod = importlib.import_module(model_path) - ClientModel = getattr(mod, 'ClientModel') + # todo tdye + model_info = { + 'model_path': model_path + } + # mod = importlib.import_module(model_path) + # ClientModel = getattr(mod, 'ClientModel') tup = MAIN_PARAMS[args.dataset][args.t] num_rounds = args.num_rounds if args.num_rounds != -1 else tup[0] @@ -47,21 +52,30 @@ def main(): tf.logging.set_verbosity(tf.logging.WARN) # Create 2 models + # model_params = (0.0003, 62) + # 默认学习率 model_params = MODEL_PARAMS[model_path] + # 重置学习率 + # 重置后的模型参数 if args.lr != -1: model_params_list = list(model_params) model_params_list[0] = args.lr model_params = tuple(model_params_list) # Create client model, and share params with server model + # 重置全局默认图 tf.reset_default_graph() - client_model = ClientModel(args.seed, *model_params) - + # model_params (0.06, 62) + # client_model = ClientModel(args.seed, *model_params) + model_info.update({ + 'seed': args.seed, + 'model_params': model_params + }) # Create server - server = Server(client_model) + server = Server(model_info) # Create clients - clients = setup_clients(args.dataset, client_model, args.use_val_set) + clients = setup_clients(args.dataset, model_info, args.use_val_set) client_ids, client_groups, client_num_samples = server.get_clients_info(clients) print('Clients in Total: %d' % len(clients)) @@ -80,16 +94,17 @@ def main(): c_ids, c_groups, c_num_samples = server.get_clients_info(server.selected_clients) # Simulate server model training on selected clients' data - sys_metrics = server.train_model(num_epochs=args.num_epochs, batch_size=args.batch_size, minibatch=args.minibatch) + sys_metrics = server.train_model(num_epochs=args.num_epochs, batch_size=args.batch_size, + minibatch=args.minibatch) sys_writer_fn(i + 1, c_ids, sys_metrics, c_groups, c_num_samples) - + # Update server model server.update_model() # Test model if (i + 1) % eval_every == 0 or (i + 1) == num_rounds: print_stats(i + 1, server, clients, client_num_samples, args, stat_writer_fn, args.use_val_set) - + # Save server model ckpt_path = os.path.join('checkpoints', args.dataset) if not os.path.exists(ckpt_path): @@ -100,19 +115,24 @@ def main(): # Close models server.close_model() + def online(clients): """We assume all users are always online.""" return clients -def create_clients(users, groups, train_data, test_data, model): +def create_clients(users, groups, train_data, test_data, model_info): if len(groups) == 0: groups = [[] for _ in users] - clients = [Client(u, g, train_data[u], test_data[u], model) for u, g in zip(users, groups)] + clients = [Client(u, g, train_data[u], test_data[u], model_info) for u, g in zip(users, groups)] + # clients = [] + # for u, g in zip(users, groups): + # model = copy.deepcopy(model) + # clients.append(Client(u, g, train_data[u], test_data[u], model)) return clients -def setup_clients(dataset, model=None, use_val_set=False): +def setup_clients(dataset, model_info=None, use_val_set=False): """Instantiates clients based on given train and test data directories. Return: @@ -124,32 +144,31 @@ def setup_clients(dataset, model=None, use_val_set=False): users, groups, train_data, test_data = read_data(train_data_dir, test_data_dir) - clients = create_clients(users, groups, train_data, test_data, model) + clients = create_clients(users, groups, train_data, test_data, model_info) return clients def get_stat_writer_function(ids, groups, num_samples, args): - def writer_fn(num_round, metrics, partition): metrics_writer.print_metrics( - num_round, ids, metrics, groups, num_samples, partition, args.metrics_dir, '{}_{}'.format(args.metrics_name, 'stat')) + num_round, ids, metrics, groups, num_samples, partition, args.metrics_dir, + '{}_{}'.format(args.metrics_name, 'stat-fedsp')) return writer_fn def get_sys_writer_function(args): - def writer_fn(num_round, ids, metrics, groups, num_samples): metrics_writer.print_metrics( - num_round, ids, metrics, groups, num_samples, 'train', args.metrics_dir, '{}_{}'.format(args.metrics_name, 'sys')) + num_round, ids, metrics, groups, num_samples, 'train', args.metrics_dir, + '{}_{}'.format(args.metrics_name, 'sys-fedsp')) return writer_fn def print_stats( - num_round, server, clients, num_samples, args, writer, use_val_set): - + num_round, server, clients, num_samples, args, writer, use_val_set): train_stat_metrics = server.test_model(clients, set_to_use='train') print_metrics(train_stat_metrics, num_samples, prefix='train_') writer(num_round, train_stat_metrics, 'train') diff --git a/models/server.py b/models/server.py index 52d488a3..4998a317 100644 --- a/models/server.py +++ b/models/server.py @@ -1,11 +1,19 @@ import numpy as np +import importlib from baseline_constants import BYTES_WRITTEN_KEY, BYTES_READ_KEY, LOCAL_COMPUTATIONS_KEY class Server: - def __init__(self, client_model): + def __init__(self, model_info): + model_path = model_info['model_path'] + seed = model_info['seed'] + model_params = model_info['model_params'] + mod = importlib.import_module(model_path) + ClientModel = getattr(mod, 'ClientModel') + client_model = ClientModel(seed, *model_params) + self.client_model = client_model self.model = client_model.get_params() self.selected_clients = []