diff --git a/baselines/fedper/LICENSE b/baselines/fedper/LICENSE
new file mode 100644
index 000000000000..d64569567334
--- /dev/null
+++ b/baselines/fedper/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/baselines/fedper/README.md b/baselines/fedper/README.md
new file mode 100644
index 000000000000..157bc22d2da5
--- /dev/null
+++ b/baselines/fedper/README.md
@@ -0,0 +1,152 @@
+---
+title: Federated Learning with Personalization Layers
+url: https://arxiv.org/abs/1912.00818
+labels: [system heterogeneity, image classification, personalization, horizontal data partition]
+dataset: [CIFAR-10, FLICKR-AES]
+---
+
+# Federated Learning with Personalization Layers
+
+> Note: If you use this baseline in your work, please remember to cite the original authors of the paper as well as the Flower paper.
+
+**Paper:** [arxiv.org/abs/1912.00818](https://arxiv.org/abs/1912.00818)
+
+**Authors:** Manoj Ghuhan Arivazhagan, Vinay Aggarwal, Aaditya Kumar Singh, and Sunav Choudhary
+
+**Abstract:** The emerging paradigm of federated learning strives to enable collaborative training of machine learning models on the network edge without centrally aggregating raw data and hence, improving data privacy. This sharply deviates from traditional machine learning and necessitates design of algorithms robust to various sources of heterogeneity. Specifically, statistical heterogeneity of data across user devices can severely degrade performance of standard federated averaging for traditional machine learning applications like personalization with deep learning. This paper proposes `FedPer`, a base + personalization layer approach for federated training of deep feed forward neural networks, which can combat the ill-effects of statistical heterogeneity. We demonstrate effectiveness of `FedPer` for non-identical data partitions of CIFAR datasets and on a personalized image aesthetics dataset from Flickr.
+
+## About this baseline
+
+**What’s implemented:** The code in this directory replicates the experiments in _Federated Learning with Personalization Layers_ (Arivazhagan et al., 2019) for CIFAR10 and FLICKR-AES datasets, which proposed the `FedPer` model. Specifically, it replicates the results found in figures 2, 4, 7, and 8 in their paper. __Note__ that there is typo in the caption of Figure 4 in the article, it should be CIFAR10 and __not__ CIFAR100.
+
+**Datasets:** CIFAR10 from PyTorch's Torchvision and FLICKR-AES. FLICKR-AES was proposed as dataset in _Personalized Image Aesthetics_ (Ren et al., 2017) and can be downloaded using a link provided on thier [GitHub](https://github.com/alanspike/personalizedImageAesthetics). One must first download FLICKR-AES-001.zip (5.76GB), extract all inside and place in baseline/FedPer/datasets. To this location, also download the other 2 related files: (1) FLICKR-AES_image_labeled_by_each_worker.csv, and (2) FLICKR-AES_image_score.txt. Images are also scaled to 224x224 for both datasets. This is not explicitly stated in the paper but seems to be boosting performance. Also, for FLICKR dataset, it is stated in the paper that they use data from clients with more than 60 and less than 290 rated images. This amounts to circa 60 clients and we randomly select 30 out of these (as in paper). Therefore, the results might differ somewhat but only slighly. Since the pre-processing steps in the paper are somewhat obscure, the metric values in the plots below may differ slightly, but not the overall results and findings.
+
+```bash
+# These steps are not needed if you are only interested in CIFAR-10
+
+# Create the `datasets` directory if it doesn't exist already
+mkdir datasets
+
+# move/copy the downloaded FLICKR-AES-001.zip file to `datasets/`
+
+# unzip dataset to a directory named `flickr`
+cd datasets
+unzip FLICKR-AES-001.zip -d flickr
+
+# then move the .csv files inside flickr
+mv FLICKR-AES_image_labeled_by_each_worker.csv flickr
+mv FLICKR-AES_image_score.txt flickr
+```
+
+**Hardware Setup:** Experiments have been carried out on GPU. 2 different computers managed to run experiments:
+
+- GeForce RTX 3080 16GB
+- GeForce RTX 4090 24GB
+
+It's worth mentioning that GPU memory for each client is ~7.5GB. When training on powerful GPUs, one can reduce ratio of GPU needed for each client in the configuration setting to e.g. `num_gpus` to 0.33.
+
+> NOTE: One experiment carried out using 1 GPU (RTX 4090) takes somehwere between 1-3h depending on dataset and model. Running ResNet34 compared to MobileNet-v1 takes approximately 10-15% longer.
+
+**Contributors:** [William Lindskog](https://github.com/WilliamLindskog)
+
+
+## Experimental Setup
+
+**Task:** Image Classification
+
+**Model:** This directory implements 2 models:
+
+- ResNet34 which can be imported directly (after having installed the packages) from PyTorch, using `from torchvision.models import resnet34
+- MobileNet-v1
+
+Please see how models are implemented using a so called model_manager and model_split class since FedPer uses head and base layers in a neural network. These classes are defined in the models.py file and thereafter called when building new models in the directory /implemented_models. Please, extend and add new models as you wish.
+
+**Dataset:** CIFAR10, FLICKR-AES. CIFAR10 will be partitioned based on number of classes for data that each client shall recieve e.g. 4 allocated classes could be [1, 3, 5, 9]. FLICKR-AES is an unbalanced dataset, so there we only apply random sampling.
+
+**Training Hyperparameters:** The hyperparameters can be found in conf/base.yaml file which is the configuration file for the main script.
+
+| Description | Default Value |
+| ----------- | ----- |
+| num_clients | 10 |
+| clients per round | 10 |
+| number of rounds | 50 |
+| client resources | {'num_cpus': 4, 'num_gpus': 1 }|
+| learning_rate | 0.01 |
+| batch_size | 128 |
+| optimizer | SGD |
+| algorithm | fedavg|
+
+**Stateful Clients:**
+In this Baseline (FedPer), we must store the state of the local client head while aggregation of body parameters happen at the server. Flower is currently making this possible but for the time being, we reside to storing client _head_ state in a folder called client_states. We store the values after each fit and evaluate function carried out on each client, and call for the state before executing these funcitons. Moreover, the state of a unique client is accessed using the client ID.
+
+> NOTE: This is a work-around so that the local head parameters are not reset before each fit and evaluate. Nevertheless, it can come to change with future releases.
+
+
+## Environment Setup
+
+To construct the Python environment follow these steps:
+
+```bash
+# Set Python 3.10
+pyenv local 3.10.6
+# Tell poetry to use python 3.10
+poetry env use 3.10.6
+
+# Install the base Poetry environment
+poetry install
+
+# Activate the environment
+poetry shell
+```
+
+## Running the Experiments
+```bash
+python -m fedper.main # this will run using the default settings in the `conf/base.yaml`
+
+# When running models for flickr dataset, it is important to keep batch size at 4 or lower since some clients (for reproducing experiment) will have very few examples of one class
+```
+
+While the config files contain a large number of settings, the ones below are the main ones you'd likely want to modify to .
+```bash
+algorithm: fedavg, fedper # these are currently supported
+server_device: 'cuda:0', 'cpu'
+dataset.name: 'cifar10', 'flickr'
+num_classes: 10, 5 # respectively
+dataset.num_classes: 4, 8, 10 # for non-iid split assigning n num_classes to each client (these numbers for CIFAR10 experiments)
+model_name: mobile, resnet
+```
+
+To run multiple runs, one can also reside to `HYDRA`'s multirun option.
+```bash
+# for CIFAR10
+python -m fedper.main --multirun --config_name cifar10 dataset.num_classes=4,8,10 model_name=resnet,mobile algorithm=fedper,fedavg model.num_head_layers=2,3
+
+# to repeat each run 5 times, one can also add
+python -m fedper.main --multirun --config_name cifar10 dataset.num_classes=4,8,10 model_name=resnet,mobile algorithm=fedper,fedavg model.num_head_layers=2,3 '+repeat_num=range(5)'
+```
+
+
+## Expected Results
+
+To reproduce figures make `fedper/run_figures.sh` executable and run it. By default all experiments will be run:
+
+```bash
+# Make fedper/run_figures.sh executable
+chmod u+x fedper/run_figures.sh
+# Run the script
+bash fedper/run_figures.sh
+```
+
+Having run the `run_figures.sh`, the expected results should look something like this:
+
+**MobileNet-v1 and ResNet-34 on CIFAR10**
+
+
+
+**MobileNet-v1 and ResNet-34 on CIFAR10 using varying size of head**
+
+
+
+**MobileNet-v1 and ResNet-34 on FLICKR-AES**
+
+
\ No newline at end of file
diff --git a/baselines/fedper/_static/mobile_plot_figure_2.png b/baselines/fedper/_static/mobile_plot_figure_2.png
new file mode 100644
index 000000000000..b485b850fb39
Binary files /dev/null and b/baselines/fedper/_static/mobile_plot_figure_2.png differ
diff --git a/baselines/fedper/_static/mobile_plot_figure_flickr.png b/baselines/fedper/_static/mobile_plot_figure_flickr.png
new file mode 100644
index 000000000000..76e99927df36
Binary files /dev/null and b/baselines/fedper/_static/mobile_plot_figure_flickr.png differ
diff --git a/baselines/fedper/_static/mobile_plot_figure_num_head.png b/baselines/fedper/_static/mobile_plot_figure_num_head.png
new file mode 100644
index 000000000000..9dcb9f0a3f33
Binary files /dev/null and b/baselines/fedper/_static/mobile_plot_figure_num_head.png differ
diff --git a/baselines/fedper/_static/resnet_plot_figure_2.png b/baselines/fedper/_static/resnet_plot_figure_2.png
new file mode 100644
index 000000000000..14e3a7145a23
Binary files /dev/null and b/baselines/fedper/_static/resnet_plot_figure_2.png differ
diff --git a/baselines/fedper/_static/resnet_plot_figure_flickr.png b/baselines/fedper/_static/resnet_plot_figure_flickr.png
new file mode 100644
index 000000000000..4e6ba71489b7
Binary files /dev/null and b/baselines/fedper/_static/resnet_plot_figure_flickr.png differ
diff --git a/baselines/fedper/_static/resnet_plot_figure_num_head.png b/baselines/fedper/_static/resnet_plot_figure_num_head.png
new file mode 100644
index 000000000000..03c6ac88b84a
Binary files /dev/null and b/baselines/fedper/_static/resnet_plot_figure_num_head.png differ
diff --git a/baselines/fedper/fedper/__init__.py b/baselines/fedper/fedper/__init__.py
new file mode 100644
index 000000000000..a5e567b59135
--- /dev/null
+++ b/baselines/fedper/fedper/__init__.py
@@ -0,0 +1 @@
+"""Template baseline package."""
diff --git a/baselines/fedper/fedper/client.py b/baselines/fedper/fedper/client.py
new file mode 100644
index 000000000000..83babbd9613f
--- /dev/null
+++ b/baselines/fedper/fedper/client.py
@@ -0,0 +1,353 @@
+"""Client implementation - can call FedPer and FedAvg clients."""
+import pickle
+from collections import OrderedDict, defaultdict
+from pathlib import Path
+from typing import Any, Callable, Dict, List, Tuple, Type, Union
+
+import numpy as np
+import torch
+from flwr.client import NumPyClient
+from flwr.common import NDArrays, Scalar
+from omegaconf import DictConfig
+from torch.utils.data import DataLoader, Subset, random_split
+from torchvision import transforms
+from torchvision.datasets import ImageFolder
+
+from fedper.constants import MEAN, STD
+from fedper.dataset_preparation import call_dataset
+from fedper.implemented_models.mobile_model import MobileNetModelManager
+from fedper.implemented_models.resnet_model import ResNetModelManager
+
+PROJECT_DIR = Path(__file__).parent.parent.absolute()
+
+
+class ClientDataloaders:
+ """Client dataloaders."""
+
+ def __init__(
+ self,
+ trainloader: DataLoader,
+ testloader: DataLoader,
+ ) -> None:
+ """Initialize the client dataloaders."""
+ self.trainloader = trainloader
+ self.testloader = testloader
+
+
+class ClientEssentials:
+ """Client essentials."""
+
+ def __init__(
+ self,
+ client_id: str,
+ client_state_save_path: str = "",
+ ) -> None:
+ """Set client state save path and client ID."""
+ self.client_id = int(client_id)
+ self.client_state_save_path = (
+ (client_state_save_path + f"/client_{self.client_id}")
+ if client_state_save_path != ""
+ else None
+ )
+
+
+class BaseClient(NumPyClient):
+ """Implementation of Federated Averaging (FedAvg) Client."""
+
+ def __init__(
+ self,
+ data_loaders: ClientDataloaders,
+ config: DictConfig,
+ client_essentials: ClientEssentials,
+ model_manager_class: Union[
+ Type[MobileNetModelManager], Type[ResNetModelManager]
+ ],
+ ):
+ """Initialize client attributes.
+
+ Args:
+ config: dictionary containing the client configurations.
+ client_id: id of the client.
+ model_manager_class: class to be used as the model manager.
+ """
+ super().__init__()
+
+ self.train_id = 1
+ self.test_id = 1
+ self.client_id = int(client_essentials.client_id)
+ self.client_state_save_path = client_essentials.client_state_save_path
+ self.hist: Dict[str, Dict[str, Any]] = defaultdict(dict)
+ self.num_epochs: int = config["num_epochs"]
+ self.model_manager = model_manager_class(
+ client_id=self.client_id,
+ config=config,
+ trainloader=data_loaders.trainloader,
+ testloader=data_loaders.testloader,
+ client_save_path=self.client_state_save_path,
+ learning_rate=config["learning_rate"],
+ )
+
+ def get_parameters(self, config: Dict[str, Scalar]) -> NDArrays:
+ """Return the current local model parameters."""
+ return self.model_manager.model.get_parameters()
+
+ def set_parameters(
+ self, parameters: List[np.ndarray], evaluate: bool = False
+ ) -> None:
+ """Set the local model parameters to the received parameters.
+
+ Args:
+ parameters: parameters to set the model to.
+ """
+ _ = evaluate
+ model_keys = [
+ k
+ for k in self.model_manager.model.state_dict().keys()
+ if k.startswith("_body") or k.startswith("_head")
+ ]
+ params_dict = zip(model_keys, parameters)
+
+ state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
+
+ self.model_manager.model.set_parameters(state_dict)
+
+ def perform_train(
+ self,
+ ) -> Dict[str, Union[List[Dict[str, float]], int, float]]:
+ """Perform local training to the whole model.
+
+ Returns
+ -------
+ Dict with the train metrics.
+ """
+ epochs = self.num_epochs
+
+ self.model_manager.model.enable_body()
+ self.model_manager.model.enable_head()
+
+ return self.model_manager.train(
+ epochs=epochs,
+ )
+
+ def fit(
+ self, parameters: NDArrays, config: Dict[str, Scalar]
+ ) -> Tuple[NDArrays, int, Dict[str, Union[bool, bytes, float, int, str]]]:
+ """Train the provided parameters using the locally held dataset.
+
+ Args:
+ parameters: The current (global) model parameters.
+ config: configuration parameters for training sent by the server.
+
+ Returns
+ -------
+ Tuple containing the locally updated model parameters, \
+ the number of examples used for training and \
+ the training metrics.
+ """
+ self.set_parameters(parameters)
+
+ train_results = self.perform_train()
+
+ # Update train history
+ self.hist[str(self.train_id)] = {
+ **self.hist[str(self.train_id)],
+ "trn": train_results,
+ }
+ print("<------- TRAIN RESULTS -------> :", train_results)
+
+ self.train_id += 1
+
+ return self.get_parameters(config), self.model_manager.train_dataset_size(), {}
+
+ def evaluate(
+ self, parameters: NDArrays, config: Dict[str, Scalar]
+ ) -> Tuple[float, int, Dict[str, Union[bool, bytes, float, int, str]]]:
+ """Evaluate the provided global parameters using the locally held dataset.
+
+ Args:
+ parameters: The current (global) model parameters.
+ config: configuration parameters for training sent by the server.
+
+ Returns
+ -------
+ Tuple containing the test loss, \
+ the number of examples used for evaluation and \
+ the evaluation metrics.
+ """
+ self.set_parameters(parameters, evaluate=True)
+
+ # Test the model
+ tst_results = self.model_manager.test()
+ print("<------- TEST RESULTS -------> :", tst_results)
+
+ # Update test history
+ self.hist[str(self.test_id)] = {
+ **self.hist[str(self.test_id)],
+ "tst": tst_results,
+ }
+ self.test_id += 1
+
+ return (
+ tst_results.get("loss", 0.0),
+ self.model_manager.test_dataset_size(),
+ {k: v for k, v in tst_results.items() if not isinstance(v, (dict, list))},
+ )
+
+
+class FedPerClient(BaseClient):
+ """Implementation of Federated Personalization (FedPer) Client."""
+
+ def get_parameters(self, config: Dict[str, Scalar]) -> NDArrays:
+ """Return the current local body parameters."""
+ return [
+ val.cpu().numpy()
+ for _, val in self.model_manager.model.body.state_dict().items()
+ ]
+
+ def set_parameters(self, parameters: List[np.ndarray], evaluate=False) -> None:
+ """Set the local body parameters to the received parameters.
+
+ Args:
+ parameters: parameters to set the body to.
+ evaluate: whether the client is evaluating or not.
+ """
+ model_keys = [
+ k
+ for k in self.model_manager.model.state_dict().keys()
+ if k.startswith("_body")
+ ]
+
+ if not evaluate:
+ # Only update client's local head if it hasn't trained yet
+ print("Setting head parameters to global head parameters.")
+ model_keys.extend(
+ [
+ k
+ for k in self.model_manager.model.state_dict().keys()
+ if k.startswith("_head")
+ ]
+ )
+
+ params_dict = zip(model_keys, parameters)
+
+ state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
+
+ self.model_manager.model.set_parameters(state_dict)
+
+
+def get_client_fn_simulation(
+ config: DictConfig,
+ client_state_save_path: str = "",
+) -> Callable[[str], Union[FedPerClient, BaseClient]]:
+ """Generate the client function that creates the Flower Clients.
+
+ Parameters
+ ----------
+ model : DictConfig
+ The model configuration.
+ cleint_state_save_path : str
+ The path to save the client state.
+
+ Returns
+ -------
+ Tuple[Callable[[str], FlowerClient], DataLoader]
+ A tuple containing the client function that creates Flower Clients and
+ the DataLoader that will be used for testing
+ """
+ assert config.model_name.lower() in [
+ "mobile",
+ "resnet",
+ ], f"Model {config.model.name} not implemented"
+
+ # load dataset and clients' data indices
+ if config.dataset.name.lower() == "cifar10":
+ try:
+ partition_path = (
+ PROJECT_DIR / "datasets" / config.dataset.name / "partition.pkl"
+ )
+ print(f"Loading partition from {partition_path}")
+ with open(partition_path, "rb") as pickle_file:
+ partition = pickle.load(pickle_file)
+ data_indices: Dict[int, Dict[str, List[int]]] = partition["data_indices"]
+ except FileNotFoundError as error:
+ print(f"Partition not found at {partition_path}")
+ raise error
+
+ # - you can define your own data transformation strategy here -
+ general_data_transform = transforms.Compose(
+ [
+ transforms.Resize((224, 224)),
+ transforms.RandomCrop(224, padding=4),
+ # transforms.RandomHorizontalFlip(),
+ # transforms.ToTensor(),
+ transforms.Normalize(
+ MEAN[config.dataset.name], STD[config.dataset.name]
+ ),
+ ]
+ )
+ # ------------------------------------------------------------
+
+ def client_fn(cid: str) -> BaseClient:
+ """Create a Flower client representing a single organization."""
+ cid_use = int(cid)
+ if config.dataset.name.lower() == "flickr":
+ transform = transforms.Compose(
+ [
+ transforms.Resize((224, 224)),
+ transforms.ToTensor(),
+ ]
+ )
+ data_path = (
+ PROJECT_DIR / "datasets" / config.dataset.name / "tmp" / f"client_{cid}"
+ )
+ dataset = ImageFolder(root=data_path, transform=transform)
+ trainset, testset = random_split(
+ dataset,
+ [int(len(dataset) * 0.8), len(dataset) - int(len(dataset) * 0.8)],
+ )
+ else:
+ dataset = call_dataset(
+ dataset_name=config.dataset.name,
+ root=PROJECT_DIR / "datasets" / config.dataset.name,
+ general_data_transform=general_data_transform,
+ )
+
+ trainset = Subset(dataset, indices=[])
+ testset = Subset(dataset, indices=[])
+ trainset.indices = data_indices[cid_use]["train"]
+ testset.indices = data_indices[cid_use]["test"]
+
+ # Create the train loader
+ trainloader = DataLoader(trainset, config.batch_size, shuffle=False)
+ # Create the test loader
+ testloader = DataLoader(testset, config.batch_size)
+
+ manager: Union[
+ Type[MobileNetModelManager], Type[ResNetModelManager]
+ ] = MobileNetModelManager
+ if config.model_name.lower() == "resnet":
+ manager = ResNetModelManager
+ elif config.model_name.lower() == "mobile":
+ manager = MobileNetModelManager
+ else:
+ raise NotImplementedError("Model not implemented, check name.")
+ client_data_loaders = ClientDataloaders(trainloader, testloader)
+ client_essentials = ClientEssentials(
+ client_id=cid,
+ client_state_save_path=client_state_save_path,
+ )
+ if client_state_save_path != "":
+ return FedPerClient(
+ data_loaders=client_data_loaders,
+ client_essentials=client_essentials,
+ config=config,
+ model_manager_class=manager,
+ )
+ return BaseClient(
+ data_loaders=client_data_loaders,
+ client_essentials=client_essentials,
+ config=config,
+ model_manager_class=manager,
+ )
+
+ return client_fn
diff --git a/baselines/fedper/fedper/conf/base.yaml b/baselines/fedper/fedper/conf/base.yaml
new file mode 100644
index 000000000000..b0b9778d4682
--- /dev/null
+++ b/baselines/fedper/fedper/conf/base.yaml
@@ -0,0 +1,44 @@
+---
+num_clients: 10 # total number of clients
+num_epochs: 4 # number of local epochs
+batch_size: 128
+num_rounds: 100
+clients_per_round: 10
+learning_rate: 0.01
+algorithm: fedper
+model_name: resnet
+
+client_resources:
+ num_cpus: 4
+ num_gpus: 1
+
+server_device: cuda:0
+
+dataset:
+ name : "cifar10"
+ split: sample
+ num_classes: 10
+ seed: 42
+ num_clients: ${num_clients}
+ fraction: 0.83
+
+model:
+ _target_: null
+ num_head_layers: 2
+ num_classes: 10
+
+fit_config:
+ drop_client: false
+ epochs : ${num_epochs}
+ batch_size: ${batch_size}
+
+strategy:
+ _target_: fedPer.server.DefaultStrategyPipeline
+ fraction_fit: 0.00001 # because we want the number of clients to sample on each roudn to be solely defined by min_fit_clients
+ min_fit_clients: ${clients_per_round}
+ fraction_evaluate: 0.0
+ min_evaluate_clients: ${clients_per_round}
+ min_available_clients: ${num_clients}
+ algorithm: ${algorithm}
+ evaluate_fn: None
+ on_evaluate_config_fn: None
\ No newline at end of file
diff --git a/baselines/fedper/fedper/conf/cifar10.yaml b/baselines/fedper/fedper/conf/cifar10.yaml
new file mode 100644
index 000000000000..66a06d481507
--- /dev/null
+++ b/baselines/fedper/fedper/conf/cifar10.yaml
@@ -0,0 +1,44 @@
+---
+num_clients: 10 # total number of clients
+num_epochs: 4 # number of local epochs
+batch_size: 128
+num_rounds: 50
+clients_per_round: 10
+learning_rate: 0.01
+algorithm: fedavg
+model_name: resnet
+
+client_resources:
+ num_cpus: 4
+ num_gpus: 1
+
+server_device: cuda:0
+
+dataset:
+ name : "cifar10"
+ split: sample
+ num_classes: 10
+ seed: 42
+ num_clients: ${num_clients}
+ fraction: 0.83
+
+model:
+ _target_: null
+ num_head_layers: 2
+ num_classes: 10
+
+fit_config:
+ drop_client: false
+ epochs : ${num_epochs}
+ batch_size: ${batch_size}
+
+strategy:
+ _target_: fedPer.server.DefaultStrategyPipeline
+ fraction_fit: 0.00001 # because we want the number of clients to sample on each roudn to be solely defined by min_fit_clients
+ min_fit_clients: ${clients_per_round}
+ fraction_evaluate: 0.0
+ min_evaluate_clients: ${clients_per_round}
+ min_available_clients: ${num_clients}
+ algorithm: ${algorithm}
+ evaluate_fn: None
+ on_evaluate_config_fn: None
\ No newline at end of file
diff --git a/baselines/fedper/fedper/conf/flickr.yaml b/baselines/fedper/fedper/conf/flickr.yaml
new file mode 100644
index 000000000000..341b1c0ac6c2
--- /dev/null
+++ b/baselines/fedper/fedper/conf/flickr.yaml
@@ -0,0 +1,44 @@
+---
+num_clients: 30 # total number of clients
+num_epochs: 4 # number of local epochs
+batch_size: 4
+num_rounds: 35
+clients_per_round: 30
+learning_rate: 0.01
+algorithm: fedper
+model_name: resnet
+
+client_resources:
+ num_cpus: 4
+ num_gpus: 1
+
+server_device: cuda:0
+
+dataset:
+ name : "flickr"
+ split: sample
+ num_classes: 5
+ seed: 42
+ num_clients: ${num_clients}
+ fraction: 0.80
+
+model:
+ _target_: null
+ num_head_layers: 2
+ num_classes: 5
+
+fit_config:
+ drop_client: false
+ epochs : ${num_epochs}
+ batch_size: ${batch_size}
+
+strategy:
+ _target_: fedPer.server.DefaultStrategyPipeline
+ fraction_fit: 0.00001 # because we want the number of clients to sample on each roudn to be solely defined by min_fit_clients
+ min_fit_clients: ${clients_per_round}
+ fraction_evaluate: 0.0
+ min_evaluate_clients: ${clients_per_round}
+ min_available_clients: ${num_clients}
+ algorithm: ${algorithm}
+ evaluate_fn: None
+ on_evaluate_config_fn: None
\ No newline at end of file
diff --git a/baselines/fedper/fedper/constants.py b/baselines/fedper/fedper/constants.py
new file mode 100644
index 000000000000..3eda77c5134e
--- /dev/null
+++ b/baselines/fedper/fedper/constants.py
@@ -0,0 +1,23 @@
+"""Constants used in machine learning pipeline."""
+from enum import Enum
+
+
+# FL Algorithms
+class Algorithms(Enum):
+ """Enum for FL algorithms."""
+
+ FEDAVG = "FedAvg"
+ FEDPER = "FedPer"
+
+
+# FL Default Train and Fine-Tuning Epochs
+DEFAULT_TRAIN_EP = 5
+DEFAULT_FT_EP = 5
+
+MEAN = {
+ "cifar10": [0.4915, 0.4823, 0.4468],
+}
+
+STD = {
+ "cifar10": [0.2470, 0.2435, 0.2616],
+}
diff --git a/baselines/fedper/fedper/dataset.py b/baselines/fedper/fedper/dataset.py
new file mode 100644
index 000000000000..81a95286b1b8
--- /dev/null
+++ b/baselines/fedper/fedper/dataset.py
@@ -0,0 +1,85 @@
+"""Handle basic dataset creation.
+
+In case of PyTorch it should return dataloaders for your dataset (for both the clients
+and the server). If you are using a custom dataset class, this module is the place to
+define it. If your dataset requires to be downloaded (and this is not done
+automatically -- e.g. as it is the case for many dataset in TorchVision) and
+partitioned, please include all those functions and logic in the
+`dataset_preparation.py` module. You can use all those functions from functions/methods
+defined here of course.
+"""
+import os
+import pickle
+import sys
+from pathlib import Path
+
+import numpy as np
+
+from fedper.dataset_preparation import (
+ call_dataset,
+ flickr_preprocess,
+ randomly_assign_classes,
+)
+
+# working dir is two up
+WORKING_DIR = Path(__file__).resolve().parent.parent
+FL_BENCH_ROOT = WORKING_DIR.parent
+
+sys.path.append(FL_BENCH_ROOT.as_posix())
+
+
+def dataset_main(config: dict) -> None:
+ """Prepare the dataset."""
+ dataset_name = config["name"].lower()
+ dataset_folder = Path(WORKING_DIR, "datasets")
+ dataset_root = Path(dataset_folder, dataset_name)
+
+ if not os.path.isdir(dataset_root):
+ os.makedirs(dataset_root)
+
+ if dataset_name == "cifar10":
+ dataset = call_dataset(dataset_name=dataset_name, root=dataset_root)
+
+ # randomly assign classes
+ assert config["num_classes"] > 0, "Number of classes must be positive"
+ config["num_classes"] = max(1, min(config["num_classes"], len(dataset.classes)))
+ # partition, stats = randomly_assign_classes(
+ partition = randomly_assign_classes(
+ dataset=dataset,
+ client_num=config["num_clients"],
+ class_num=config["num_classes"],
+ )
+
+ clients_4_train = list(range(config["num_clients"]))
+ clients_4_test = list(range(config["num_clients"]))
+
+ partition["separation"] = {
+ "train": clients_4_train,
+ "test": clients_4_test,
+ "total": config["num_clients"],
+ }
+ for client_id, idx in enumerate(partition["data_indices"]):
+ if config["split"] == "sample":
+ num_train_samples = int(len(idx) * config["fraction"])
+
+ np.random.shuffle(idx)
+ idx_train, idx_test = idx[:num_train_samples], idx[num_train_samples:]
+ partition["data_indices"][client_id] = {
+ "train": idx_train,
+ "test": idx_test,
+ }
+ else:
+ if client_id in clients_4_train:
+ partition["data_indices"][client_id] = {"train": idx, "test": []}
+ else:
+ partition["data_indices"][client_id] = {"train": [], "test": idx}
+ with open(dataset_root / "partition.pkl", "wb") as pickle_file:
+ pickle.dump(partition, pickle_file)
+
+ # with open(dataset_root / "all_stats.json", "w") as f:
+ # json.dump(stats, f)
+
+ elif dataset_name.lower() == "flickr":
+ flickr_preprocess(dataset_root, config)
+ else:
+ raise RuntimeError("Please implement the dataset preparation for your dataset.")
diff --git a/baselines/fedper/fedper/dataset_preparation.py b/baselines/fedper/fedper/dataset_preparation.py
new file mode 100644
index 000000000000..0b8b53782aac
--- /dev/null
+++ b/baselines/fedper/fedper/dataset_preparation.py
@@ -0,0 +1,209 @@
+"""Dataset preparation."""
+import os
+import random
+from collections import Counter
+from pathlib import Path
+from typing import Any, Dict, List, Union
+
+import numpy as np
+import pandas as pd
+import torch
+import torchvision
+from torch.utils.data import Dataset
+from torchvision import transforms
+
+
+class BaseDataset(Dataset):
+ """Base class for all datasets."""
+
+ def __init__(
+ self,
+ root: Path = Path("datasets/cifar10"),
+ general_data_transform: transforms.transforms.Compose = None,
+ ) -> None:
+ """Initialize the dataset."""
+ self.root = root
+ self.classes = None
+ self.data: torch.tensor = None
+ self.targets: torch.tensor = None
+ self.general_data_transform = general_data_transform
+
+ def __getitem__(self, index):
+ """Get the item at the given index."""
+ data, targets = self.data[index], self.targets[index]
+ if self.general_data_transform is not None:
+ data = self.general_data_transform(data)
+ return data, targets
+
+ def __len__(self):
+ """Return the length of the dataset."""
+ return len(self.targets)
+
+
+class CIFAR10(BaseDataset):
+ """CIFAR10 dataset."""
+
+ def __init__(
+ self,
+ root: Path = Path("datasets/cifar10"),
+ general_data_transform=None,
+ ):
+ super().__init__()
+ train_part = torchvision.datasets.CIFAR10(root, True, download=True)
+ test_part = torchvision.datasets.CIFAR10(root, False, download=True)
+ train_data = torch.tensor(train_part.data).permute([0, -1, 1, 2]).float()
+ test_data = torch.tensor(test_part.data).permute([0, -1, 1, 2]).float()
+ train_targets = torch.tensor(train_part.targets).long().squeeze()
+ test_targets = torch.tensor(test_part.targets).long().squeeze()
+ self.data = torch.cat([train_data, test_data])
+ self.targets = torch.cat([train_targets, test_targets])
+ self.classes = train_part.classes
+ self.general_data_transform = general_data_transform
+
+
+def flickr_preprocess(root, config):
+ """Preprocess the FLICKR dataset."""
+ print("Preprocessing FLICKR dataset...")
+ # create a tmp folder to store the preprocessed data
+ tmp_folder = Path(root, "tmp")
+ if not os.path.isdir(tmp_folder):
+ os.makedirs(tmp_folder)
+
+ # remove any folder or file in tmp folder, even if it is not empty
+ os.system(f"rm -rf {tmp_folder.as_posix()}/*")
+
+ # get number of clients
+ num_clients = config["num_clients"]
+ # get flickr image labels per clients
+ df_labelled_igms = pd.read_csv(
+ Path(root, "FLICKR-AES_image_labeled_by_each_worker.csv")
+ )
+ # take num_clients random workers from df
+ # #where workers have minimum 60 images and maximum 290
+ df_labelled_igms = df_labelled_igms.groupby("worker").filter(
+ lambda x: len(x) >= 60 and len(x) <= 290
+ )
+ # only take workers that have at least 1 image for each score (1-5)
+ df_labelled_igms = df_labelled_igms.groupby("worker").filter(
+ lambda x: len(x[" score"].unique()) == 5
+ )
+ df_labelled_igms = df_labelled_igms.groupby("worker").filter(
+ lambda x: x[" score"].value_counts().min() >= 4
+ )
+ # only take workers that have at least 4 images for each score (1-5)
+
+ # get num_clients random workers
+ clients = np.random.choice(
+ df_labelled_igms["worker"].unique(), num_clients, replace=False
+ )
+ for i, client in enumerate(clients):
+ print(f"Processing client {i}...")
+ df_client = df_labelled_igms[df_labelled_igms["worker"] == client]
+ client_path = Path(tmp_folder, f"client_{i}")
+ if not os.path.isdir(client_path):
+ os.makedirs(client_path)
+ # create score folder in client folder, scores go from 1-5
+ for score in range(1, 6):
+ score_path = Path(client_path, str(score))
+ if not os.path.isdir(score_path):
+ os.makedirs(score_path)
+ # copy images to score folder
+ for _, row in df_client.iterrows():
+ img_path = Path(root, "40K", row[" imagePair"])
+ score_path = Path(client_path, str(row[" score"]))
+ if os.path.isfile(img_path):
+ os.system(f"cp {img_path} {score_path}")
+
+
+def call_dataset(dataset_name, root, **kwargs):
+ """Call the dataset."""
+ if dataset_name == "cifar10":
+ return CIFAR10(root, **kwargs)
+ raise ValueError(f"Dataset {dataset_name} not supported.")
+
+
+def randomly_assign_classes(
+ dataset: Dataset, client_num: int, class_num: int
+) -> Dict[str, Union[Dict[Any, Any], List[Any]]]:
+ # ) -> Dict[str, Any]:
+ """Randomly assign number classes to clients."""
+ partition: Dict[str, Union[Dict, List]] = {"separation": {}, "data_indices": []}
+ data_indices: List[List[int]] = [[] for _ in range(client_num)]
+ targets_numpy = np.array(dataset.targets, dtype=np.int32)
+ label_list = list(range(len(dataset.classes)))
+
+ data_idx_for_each_label = [
+ np.where(targets_numpy == i)[0].tolist() for i in label_list
+ ]
+
+ assigned_labels = []
+ selected_times = [0 for _ in label_list]
+ for _ in range(client_num):
+ sampled_labels = random.sample(label_list, class_num)
+ assigned_labels.append(sampled_labels)
+ for j in sampled_labels:
+ selected_times[j] += 1
+
+ batch_sizes = _get_batch_sizes(
+ targets_numpy=targets_numpy,
+ label_list=label_list,
+ selected_times=selected_times,
+ )
+
+ data_indices = _get_data_indices(
+ batch_sizes=batch_sizes,
+ data_indices=data_indices,
+ data_idx_for_each_label=data_idx_for_each_label,
+ assigned_labels=assigned_labels,
+ client_num=client_num,
+ )
+
+ partition["data_indices"] = data_indices
+
+ return partition # , stats
+
+
+def _get_batch_sizes(
+ targets_numpy: np.ndarray,
+ label_list: List[int],
+ selected_times: List[int],
+) -> np.ndarray:
+ """Get batch sizes for each label."""
+ labels_count = Counter(targets_numpy)
+ batch_sizes = np.zeros_like(label_list)
+ for i in label_list:
+ print(f"label: {i}, count: {labels_count[i]}")
+ print(f"selected times: {selected_times[i]}")
+ batch_sizes[i] = int(labels_count[i] / selected_times[i])
+
+ return batch_sizes
+
+
+def _get_data_indices(
+ batch_sizes: np.ndarray,
+ data_indices: List[List[int]],
+ data_idx_for_each_label: List[List[int]],
+ assigned_labels: List[List[int]],
+ client_num: int,
+) -> List[List[int]]:
+ for i in range(client_num):
+ for cls in assigned_labels[i]:
+ if len(data_idx_for_each_label[cls]) < 2 * batch_sizes[cls]:
+ batch_size = len(data_idx_for_each_label[cls])
+ else:
+ batch_size = batch_sizes[cls]
+ selected_idx = random.sample(data_idx_for_each_label[cls], batch_size)
+ data_indices_use: np.ndarray = np.concatenate(
+ [data_indices[i], selected_idx], axis=0
+ ).astype(np.int64)
+ data_indices[i] = data_indices_use.tolist()
+ # data_indices[i]: np.ndarray = np.concatenate(
+ # [data_indices[i], selected_idx], axis=0
+ # ).astype(np.int64)
+ data_idx_for_each_label[cls] = list(
+ set(data_idx_for_each_label[cls]) - set(selected_idx)
+ )
+
+ data_indices[i] = data_indices[i]
+
+ return data_indices
diff --git a/baselines/fedper/fedper/implemented_models/mobile_model.py b/baselines/fedper/fedper/implemented_models/mobile_model.py
new file mode 100644
index 000000000000..57d3210c9511
--- /dev/null
+++ b/baselines/fedper/fedper/implemented_models/mobile_model.py
@@ -0,0 +1,258 @@
+"""MobileNet-v1 model, model manager and model split."""
+from typing import Dict, List, Optional, Tuple, Union
+
+import torch
+import torch.nn as nn
+from omegaconf import DictConfig
+from torch.utils.data import DataLoader
+
+from fedper.models import ModelManager, ModelSplit
+
+# Set model architecture
+ARCHITECTURE = {
+ "layer_1": {"conv_dw": [32, 64, 1]},
+ "layer_2": {"conv_dw": [64, 128, 2]},
+ "layer_3": {"conv_dw": [128, 128, 1]},
+ "layer_4": {"conv_dw": [128, 256, 2]},
+ "layer_5": {"conv_dw": [256, 256, 1]},
+ "layer_6": {"conv_dw": [256, 512, 2]},
+ "layer_7": {"conv_dw": [512, 512, 1]},
+ "layer_8": {"conv_dw": [512, 512, 1]},
+ "layer_9": {"conv_dw": [512, 512, 1]},
+ "layer_10": {"conv_dw": [512, 512, 1]},
+ "layer_11": {"conv_dw": [512, 512, 1]},
+ "layer_12": {"conv_dw": [512, 1024, 2]},
+ "layer_13": {"conv_dw": [1024, 1024, 1]},
+}
+
+
+class MobileNet(nn.Module):
+ """Model from MobileNet-v1 (https://github.com/wjc852456/pytorch-mobilenet-v1)."""
+
+ def __init__(
+ self,
+ num_head_layers: int = 1,
+ num_classes: int = 10,
+ ) -> None:
+ super(MobileNet, self).__init__()
+
+ self.architecture = ARCHITECTURE
+
+ def conv_bn(inp, oup, stride):
+ return nn.Sequential(
+ nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
+ nn.BatchNorm2d(oup),
+ nn.ReLU(inplace=True),
+ )
+
+ def conv_dw(inp, oup, stride):
+ return nn.Sequential(
+ nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
+ nn.BatchNorm2d(inp),
+ nn.ReLU(inplace=True),
+ nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
+ nn.BatchNorm2d(oup),
+ nn.ReLU(inplace=True),
+ )
+
+ self.body = nn.Sequential()
+ self.body.add_module("initial_batch_norm", conv_bn(3, 32, 2))
+ for i in range(1, 13):
+ for _, value in self.architecture[f"layer_{i}"].items():
+ self.body.add_module(f"conv_dw_{i}", conv_dw(*value))
+
+ self.body.add_module("avg_pool", nn.AvgPool2d([7]))
+ self.body.add_module("fc", nn.Linear(1024, num_classes))
+
+ if num_head_layers == 1:
+ self.head = nn.Sequential(
+ nn.AvgPool2d([7]), nn.Flatten(), nn.Linear(1024, num_classes)
+ )
+ self.body.avg_pool = nn.Identity()
+ self.body.fc = nn.Identity()
+ elif num_head_layers == 2:
+ self.head = nn.Sequential(
+ conv_dw(1024, 1024, 1),
+ nn.AvgPool2d([7]),
+ nn.Flatten(),
+ nn.Linear(1024, num_classes),
+ )
+ self.body.conv_dw_13 = nn.Identity()
+ self.body.avg_pool = nn.Identity()
+ self.body.fc = nn.Identity()
+ elif num_head_layers == 3:
+ self.head = nn.Sequential(
+ conv_dw(512, 1024, 2),
+ conv_dw(1024, 1024, 1),
+ nn.AvgPool2d([7]),
+ nn.Flatten(),
+ nn.Linear(1024, num_classes),
+ )
+ self.body.conv_dw_12 = nn.Identity()
+ self.body.conv_dw_13 = nn.Identity()
+ self.body.avg_pool = nn.Identity()
+ self.body.fc = nn.Identity()
+ elif num_head_layers == 4:
+ self.head = nn.Sequential(
+ conv_dw(512, 512, 1),
+ conv_dw(512, 1024, 2),
+ conv_dw(1024, 1024, 1),
+ nn.AvgPool2d([7]),
+ nn.Flatten(),
+ nn.Linear(1024, num_classes),
+ )
+ self.body.conv_dw_11 = nn.Identity()
+ self.body.conv_dw_12 = nn.Identity()
+ self.body.conv_dw_13 = nn.Identity()
+ self.body.avg_pool = nn.Identity()
+ self.body.fc = nn.Identity()
+ else:
+ raise NotImplementedError("Number of head layers not implemented.")
+
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
+ """Forward pass of the model."""
+ x = self.body(x)
+ return self.head(x)
+
+
+class MobileNetModelSplit(ModelSplit):
+ """Split MobileNet model into body and head."""
+
+ def _get_model_parts(self, model: MobileNet) -> Tuple[nn.Module, nn.Module]:
+ return model.body, model.head
+
+
+class MobileNetModelManager(ModelManager):
+ """Manager for models with Body/Head split."""
+
+ def __init__(
+ self,
+ client_id: int,
+ config: DictConfig,
+ trainloader: DataLoader,
+ testloader: DataLoader,
+ client_save_path: Optional[str] = "",
+ learning_rate: float = 0.01,
+ ):
+ """Initialize the attributes of the model manager.
+
+ Args:
+ client_id: The id of the client.
+ config: Dict containing the configurations to be used by the manager.
+ """
+ super().__init__(
+ model_split_class=MobileNetModelSplit,
+ client_id=client_id,
+ config=config,
+ )
+ self.trainloader, self.testloader = trainloader, testloader
+ self.device = self.config["server_device"]
+ self.client_save_path = client_save_path if client_save_path != "" else None
+ self.learning_rate = learning_rate
+
+ def _create_model(self) -> nn.Module:
+ """Return MobileNet-v1 model to be splitted into head and body."""
+ try:
+ return MobileNet(
+ num_head_layers=self.config["model"]["num_head_layers"],
+ num_classes=self.config["model"]["num_classes"],
+ ).to(self.device)
+ except AttributeError:
+ self.device = self.config["server_device"]
+ return MobileNet(
+ num_head_layers=self.config["model"]["num_head_layers"],
+ num_classes=self.config["model"]["num_classes"],
+ ).to(self.device)
+
+ def train(
+ self,
+ epochs: int = 1,
+ ) -> Dict[str, Union[List[Dict[str, float]], int, float]]:
+ """Train the model maintained in self.model.
+
+ Method adapted from simple MobileNet-v1 (PyTorch) \
+ https://github.com/wjc852456/pytorch-mobilenet-v1.
+
+ Args:
+ epochs: number of training epochs.
+
+ Returns
+ -------
+ Dict containing the train metrics.
+ """
+ # Load client state (head) if client_save_path is not None and it is not empty
+ if self.client_save_path is not None:
+ try:
+ self.model.head.load_state_dict(torch.load(self.client_save_path))
+ except FileNotFoundError:
+ print("No client state found, training from scratch.")
+ pass
+
+ criterion = torch.nn.CrossEntropyLoss()
+ optimizer = torch.optim.SGD(
+ self.model.parameters(), lr=self.learning_rate, momentum=0.9
+ )
+ correct, total = 0, 0
+ loss: torch.Tensor = 0.0
+ # self.model.train()
+ for _ in range(epochs):
+ for images, labels in self.trainloader:
+ optimizer.zero_grad()
+ outputs = self.model(images.to(self.device))
+ labels = labels.to(self.device)
+ loss = criterion(outputs, labels)
+ loss.backward()
+ optimizer.step()
+ total += labels.size(0)
+ correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
+
+ # Save client state (head)
+ if self.client_save_path is not None:
+ torch.save(self.model.head.state_dict(), self.client_save_path)
+
+ return {"loss": loss.item(), "accuracy": correct / total}
+
+ def test(
+ self,
+ ) -> Dict[str, float]:
+ """Test the model maintained in self.model.
+
+ Returns
+ -------
+ Dict containing the test metrics.
+ """
+ # Load client state (head)
+ if self.client_save_path is not None:
+ self.model.head.load_state_dict(torch.load(self.client_save_path))
+
+ criterion = torch.nn.CrossEntropyLoss()
+ correct, total, loss = 0, 0, 0.0
+ # self.model.eval()
+ with torch.no_grad():
+ for images, labels in self.testloader:
+ outputs = self.model(images.to(self.device))
+ labels = labels.to(self.device)
+ loss += criterion(outputs, labels).item()
+ total += labels.size(0)
+ correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
+ print("Test Accuracy: {:.4f}".format(correct / total))
+
+ if self.client_save_path is not None:
+ torch.save(self.model.head.state_dict(), self.client_save_path)
+
+ return {
+ "loss": loss / len(self.testloader.dataset),
+ "accuracy": correct / total,
+ }
+
+ def train_dataset_size(self) -> int:
+ """Return train data set size."""
+ return len(self.trainloader)
+
+ def test_dataset_size(self) -> int:
+ """Return test data set size."""
+ return len(self.testloader)
+
+ def total_dataset_size(self) -> int:
+ """Return total data set size."""
+ return len(self.trainloader) + len(self.testloader)
diff --git a/baselines/fedper/fedper/implemented_models/resnet_model.py b/baselines/fedper/fedper/implemented_models/resnet_model.py
new file mode 100644
index 000000000000..0d9837b118a3
--- /dev/null
+++ b/baselines/fedper/fedper/implemented_models/resnet_model.py
@@ -0,0 +1,272 @@
+"""ResNet model, model manager and split."""
+from typing import Dict, List, Optional, Tuple, Union
+
+import torch
+import torch.nn as nn
+from omegaconf import DictConfig
+from torch.utils.data import DataLoader
+from torchvision.models.resnet import resnet34
+
+from fedper.models import ModelManager, ModelSplit
+
+
+def conv3x3(
+ in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1
+) -> nn.Conv2d:
+ """3x3 convolution with padding."""
+ return nn.Conv2d(
+ in_planes,
+ out_planes,
+ kernel_size=3,
+ stride=stride,
+ padding=dilation,
+ groups=groups,
+ bias=False,
+ dilation=dilation,
+ )
+
+
+def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
+ """1x1 convolution."""
+ return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
+
+
+class BasicBlock(nn.Module):
+ """Basic block for ResNet."""
+
+ expansion: int = 1
+
+ def __init__(
+ self,
+ inplanes: int,
+ planes: int,
+ stride: int = 1,
+ downsample: Optional[nn.Module] = None,
+ ) -> None:
+ super().__init__()
+ norm_layer = nn.BatchNorm2d
+ # Both self.conv1 and self.downsample layers downsample input when stride != 1
+ self.conv1 = conv3x3(inplanes, planes, stride)
+ self.bn1 = norm_layer(planes)
+ self.relu = nn.ReLU(inplace=True)
+ self.conv2 = conv3x3(planes, planes)
+ self.bn2 = norm_layer(planes)
+ self.downsample = downsample
+ self.stride = stride
+
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
+ """Forward inputs through the block."""
+ identity = x
+
+ out = self.conv1(x)
+ out = self.bn1(out)
+ out = self.relu(out)
+
+ out = self.conv2(out)
+ out = self.bn2(out)
+
+ if self.downsample is not None:
+ identity = self.downsample(x)
+
+ out += identity
+ out = self.relu(out)
+
+ return out
+
+
+class ResNet(nn.Module):
+ """ResNet model."""
+
+ def __init__(
+ self,
+ num_head_layers: int = 1,
+ num_classes: int = 10,
+ ) -> None:
+ super(ResNet, self).__init__()
+ assert (
+ num_head_layers > 0 and num_head_layers <= 17
+ ), "num_head_layers must be greater than 0 and less than 16"
+
+ self.num_head_layers = num_head_layers
+ self.body = resnet34()
+
+ # if only one head layer
+ if self.num_head_layers == 1:
+ self.head = self.body.fc
+ self.body.fc = nn.Identity()
+ elif self.num_head_layers == 2:
+ self.head = nn.Sequential(
+ BasicBlock(512, 512),
+ nn.AdaptiveAvgPool2d((1, 1)),
+ nn.Flatten(),
+ nn.Linear(512, num_classes),
+ )
+ # remove head layers from body
+ self.body = nn.Sequential(*list(self.body.children())[:-2])
+ body_layer4 = list(self.body.children())[-1]
+ self.body = nn.Sequential(*list(self.body.children())[:-1])
+ self.body.layer4 = nn.Sequential(*list(body_layer4.children())[:-1])
+ elif self.num_head_layers == 3:
+ self.head = nn.Sequential(
+ BasicBlock(512, 512),
+ BasicBlock(512, 512),
+ nn.AdaptiveAvgPool2d((1, 1)),
+ nn.Flatten(),
+ nn.Linear(512, num_classes),
+ )
+ # remove head layers from body
+ self.body = nn.Sequential(*list(self.body.children())[:-2])
+ body_layer4 = list(self.body.children())[-1]
+ self.body = nn.Sequential(*list(self.body.children())[:-1])
+ self.body.layer4 = nn.Sequential(*list(body_layer4.children())[:-2])
+ else:
+ raise NotImplementedError("Only 1 or 2 head layers supported")
+
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
+ """Forward inputs through the model."""
+ print("Forwarding through ResNet model")
+ x = self.body(x)
+ return self.head(x)
+
+
+class ResNetModelSplit(ModelSplit):
+ """Split ResNet model into body and head."""
+
+ def _get_model_parts(self, model: ResNet) -> Tuple[nn.Module, nn.Module]:
+ return model.body, model.head
+
+
+class ResNetModelManager(ModelManager):
+ """Manager for models with Body/Head split."""
+
+ def __init__(
+ self,
+ client_save_path: Optional[str],
+ client_id: int,
+ config: DictConfig,
+ trainloader: DataLoader,
+ testloader: DataLoader,
+ learning_rate: float = 0.01,
+ ):
+ """Initialize the attributes of the model manager.
+
+ Args:
+ client_save_path: Path to save the client state.
+ client_id: The id of the client.
+ config: Dict containing the configurations to be used by the manager.
+ trainloader: DataLoader containing the train data.
+ testloader: DataLoader containing the test data.
+ learning_rate: Learning rate for the optimizer.
+ """
+ super().__init__(
+ model_split_class=ResNetModelSplit,
+ client_id=client_id,
+ config=config,
+ )
+ self.client_save_path = client_save_path
+ self.trainloader, self.testloader = trainloader, testloader
+ self.device = self.config["server_device"]
+ self.learning_rate = learning_rate
+
+ def _create_model(self) -> nn.Module:
+ """Return MobileNet-v1 model to be splitted into head and body."""
+ try:
+ return ResNet(
+ num_head_layers=self.config["model"]["num_head_layers"],
+ num_classes=self.config["model"]["num_classes"],
+ ).to(self.device)
+ except AttributeError:
+ self.device = self.config["server_device"]
+ return ResNet(
+ num_head_layers=self.config["model"]["num_head_layers"],
+ num_classes=self.config["model"]["num_classes"],
+ ).to(self.device)
+
+ def train(
+ self,
+ epochs: int = 1,
+ ) -> Dict[str, Union[List[Dict[str, float]], int, float]]:
+ """Train the model maintained in self.model.
+
+ Method adapted from simple MobileNet-v1 (PyTorch) \
+ https://github.com/wjc852456/pytorch-mobilenet-v1.
+
+ Args:
+ epochs: number of training epochs.
+
+ Returns
+ -------
+ Dict containing the train metrics.
+ """
+ # Load client state (head) if client_save_path is not None and it is not empty
+ if self.client_save_path is not None:
+ try:
+ self.model.head.load_state_dict(torch.load(self.client_save_path))
+ except FileNotFoundError:
+ print("No client state found, training from scratch.")
+ pass
+
+ criterion = torch.nn.CrossEntropyLoss()
+ optimizer = torch.optim.SGD(
+ self.model.parameters(), lr=self.learning_rate, momentum=0.9
+ )
+ correct, total = 0, 0
+ loss: torch.Tensor = 0.0
+ # self.model.train()
+ for _ in range(epochs):
+ for images, labels in self.trainloader:
+ optimizer.zero_grad()
+ outputs = self.model(images.to(self.device))
+ labels = labels.to(self.device)
+ loss = criterion(outputs, labels)
+ loss.backward()
+
+ optimizer.step()
+ total += labels.size(0)
+ correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
+
+ # Save client state (head)
+ if self.client_save_path is not None:
+ torch.save(self.model.head.state_dict(), self.client_save_path)
+
+ return {"loss": loss.item(), "accuracy": correct / total}
+
+ def test(
+ self,
+ ) -> Dict[str, float]:
+ """Test the model maintained in self.model."""
+ # Load client state (head)
+ if self.client_save_path is not None:
+ self.model.head.load_state_dict(torch.load(self.client_save_path))
+
+ criterion = torch.nn.CrossEntropyLoss()
+ correct, total, loss = 0, 0, 0.0
+ # self.model.eval()
+ with torch.no_grad():
+ for images, labels in self.testloader:
+ outputs = self.model(images.to(self.device))
+ labels = labels.to(self.device)
+ loss += criterion(outputs, labels).item()
+ total += labels.size(0)
+ correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
+ print("Test Accuracy: {:.4f}".format(correct / total))
+
+ if self.client_save_path is not None:
+ torch.save(self.model.head.state_dict(), self.client_save_path)
+
+ return {
+ "loss": loss / len(self.testloader.dataset),
+ "accuracy": correct / total,
+ }
+
+ def train_dataset_size(self) -> int:
+ """Return train data set size."""
+ return len(self.trainloader)
+
+ def test_dataset_size(self) -> int:
+ """Return test data set size."""
+ return len(self.testloader)
+
+ def total_dataset_size(self) -> int:
+ """Return total data set size."""
+ return len(self.trainloader) + len(self.testloader)
diff --git a/baselines/fedper/fedper/main.py b/baselines/fedper/fedper/main.py
new file mode 100644
index 000000000000..b421b2e0442c
--- /dev/null
+++ b/baselines/fedper/fedper/main.py
@@ -0,0 +1,126 @@
+"""Create and connect the building blocks for your experiments; start the simulation.
+
+It includes processioning the dataset, instantiate strategy, specify how the global
+model is going to be evaluated, etc. At the end, this script saves the results.
+"""
+
+from pathlib import Path
+
+import flwr as fl
+import hydra
+from hydra.core.hydra_config import HydraConfig
+from hydra.utils import instantiate
+from omegaconf import DictConfig, OmegaConf
+
+from fedper.dataset import dataset_main
+from fedper.utils import (
+ get_client_fn,
+ get_create_model_fn,
+ plot_metric_from_history,
+ save_results_as_pickle,
+ set_client_state_save_path,
+ set_model_class,
+ set_num_classes,
+ set_server_target,
+)
+
+
+@hydra.main(config_path="conf", config_name="base", version_base=None)
+def main(cfg: DictConfig) -> None:
+ """Run the baseline.
+
+ Parameters
+ ----------
+ cfg : DictConfig
+ An omegaconf object that stores the hydra config.
+ """
+ # 1. Print parsed config
+ # Set the model class, server target, and number of classes
+ cfg = set_model_class(cfg)
+ cfg = set_server_target(cfg)
+ cfg = set_num_classes(cfg)
+
+ print(OmegaConf.to_yaml(cfg))
+
+ # Create directory to store client states if it does not exist
+ # Client state has subdirectories with the name of current time
+ client_state_save_path = set_client_state_save_path()
+
+ # 2. Prepare your dataset
+ dataset_main(cfg.dataset)
+
+ # 3. Define your clients
+ # Get client function
+ client_fn = get_client_fn(
+ config=cfg,
+ client_state_save_path=client_state_save_path,
+ )
+
+ # get a function that will be used to construct the config that the client's
+ # fit() method will received
+ def get_on_fit_config():
+ def fit_config_fn(server_round: int):
+ # resolve and convert to python dict
+ fit_config = OmegaConf.to_container(cfg.fit_config, resolve=True)
+ _ = server_round
+ return fit_config
+
+ return fit_config_fn
+
+ # get a function that will be used to construct the model
+ create_model, split = get_create_model_fn(cfg)
+
+ # 4. Define your strategy
+ strategy = instantiate(
+ cfg.strategy,
+ create_model=create_model,
+ on_fit_config_fn=get_on_fit_config(),
+ model_split_class=split,
+ )
+
+ # 5. Start Simulation
+ history = fl.simulation.start_simulation(
+ client_fn=client_fn,
+ num_clients=cfg.num_clients,
+ config=fl.server.ServerConfig(num_rounds=cfg.num_rounds),
+ client_resources={
+ "num_cpus": cfg.client_resources.num_cpus,
+ "num_gpus": cfg.client_resources.num_gpus,
+ },
+ strategy=strategy,
+ )
+
+ # Experiment completed. Now we save the results and
+ # generate plots using the `history`
+ print("................")
+ print(history)
+
+ # 6. Save your results
+ save_path = Path(HydraConfig.get().runtime.output_dir)
+
+ # save results as a Python pickle using a file_path
+ # the directory created by Hydra for each run
+ save_results_as_pickle(
+ history,
+ file_path=save_path,
+ )
+ # plot results and include them in the readme
+ strategy_name = strategy.__class__.__name__
+ file_suffix: str = (
+ f"_{strategy_name}"
+ f"_C={cfg.num_clients}"
+ f"_B={cfg.batch_size}"
+ f"_E={cfg.num_epochs}"
+ f"_R={cfg.num_rounds}"
+ f"_lr={cfg.learning_rate}"
+ )
+
+ plot_metric_from_history(
+ history,
+ save_path,
+ (file_suffix),
+ )
+
+
+if __name__ == "__main__":
+ main()
diff --git a/baselines/fedper/fedper/models.py b/baselines/fedper/fedper/models.py
new file mode 100644
index 000000000000..2a2ebde158f8
--- /dev/null
+++ b/baselines/fedper/fedper/models.py
@@ -0,0 +1,189 @@
+"""Abstract class for splitting a model into body and head."""
+from abc import ABC, abstractmethod
+from collections import OrderedDict
+from typing import Any, Dict, List, Tuple, Type, Union
+
+import numpy as np
+from omegaconf import DictConfig
+from torch import Tensor
+from torch import nn as nn
+
+
+class ModelSplit(ABC, nn.Module):
+ """Abstract class for splitting a model into body and head."""
+
+ def __init__(
+ self,
+ model: nn.Module,
+ ):
+ """Initialize the attributes of the model split.
+
+ Args:
+ model: dict containing the vocab sizes of the input attributes.
+ """
+ super().__init__()
+
+ self._body, self._head = self._get_model_parts(model)
+
+ @abstractmethod
+ def _get_model_parts(self, model: nn.Module) -> Tuple[nn.Module, nn.Module]:
+ """Return the body and head of the model.
+
+ Args:
+ model: model to be split into head and body
+
+ Returns
+ -------
+ Tuple where the first element is the body of the model
+ and the second is the head.
+ """
+
+ @property
+ def body(self) -> nn.Module:
+ """Return model body."""
+ return self._body
+
+ @body.setter
+ def body(self, state_dict: "OrderedDict[str, Tensor]") -> None:
+ """Set model body.
+
+ Args:
+ state_dict: dictionary of the state to set the model body to.
+ """
+ self.body.load_state_dict(state_dict, strict=True)
+
+ @property
+ def head(self) -> nn.Module:
+ """Return model head."""
+ return self._head
+
+ @head.setter
+ def head(self, state_dict: "OrderedDict[str, Tensor]") -> None:
+ """Set model head.
+
+ Args:
+ state_dict: dictionary of the state to set the model head to.
+ """
+ self.head.load_state_dict(state_dict, strict=True)
+
+ def get_parameters(self) -> List[np.ndarray]:
+ """Get model parameters (without fixed head).
+
+ Returns
+ -------
+ Body and head parameters
+ """
+ return [
+ val.cpu().numpy()
+ for val in [
+ *self.body.state_dict().values(),
+ *self.head.state_dict().values(),
+ ]
+ ]
+
+ def set_parameters(self, state_dict: Dict[str, Tensor]) -> None:
+ """Set model parameters.
+
+ Args:
+ state_dict: dictionary of the state to set the model to.
+ """
+ ordered_state_dict = OrderedDict(self.state_dict().copy())
+ # Update with the values of the state_dict
+ ordered_state_dict.update(dict(state_dict.items()))
+ self.load_state_dict(ordered_state_dict, strict=False)
+
+ def enable_head(self) -> None:
+ """Enable gradient tracking for the head parameters."""
+ for param in self.head.parameters():
+ param.requires_grad = True
+
+ def enable_body(self) -> None:
+ """Enable gradient tracking for the body parameters."""
+ for param in self.body.parameters():
+ param.requires_grad = True
+
+ def disable_head(self) -> None:
+ """Disable gradient tracking for the head parameters."""
+ for param in self.head.parameters():
+ param.requires_grad = False
+
+ def disable_body(self) -> None:
+ """Disable gradient tracking for the body parameters."""
+ for param in self.body.parameters():
+ param.requires_grad = False
+
+ def forward(self, inputs: Any) -> Any:
+ """Forward inputs through the body and the head."""
+ x = self.body(inputs)
+ return self.head(x)
+
+
+class ModelManager(ABC):
+ """Manager for models with Body/Head split."""
+
+ def __init__(
+ self,
+ client_id: int,
+ config: DictConfig,
+ model_split_class: Type[Any], # ModelSplit
+ ):
+ """Initialize the attributes of the model manager.
+
+ Args:
+ client_id: The id of the client.
+ config: Dict containing the configurations to be used by the manager.
+ model_split_class: Class to be used to split the model into body and head\
+ (concrete implementation of ModelSplit).
+ """
+ super().__init__()
+
+ self.client_id = client_id
+ self.config = config
+ self._model = model_split_class(self._create_model())
+
+ @abstractmethod
+ def _create_model(self) -> nn.Module:
+ """Return model to be splitted into head and body."""
+
+ @abstractmethod
+ def train(
+ self,
+ epochs: int = 1,
+ ) -> Dict[str, Union[List[Dict[str, float]], int, float]]:
+ """Train the model maintained in self.model.
+
+ Args:
+ epochs: number of training epochs.
+
+ Returns
+ -------
+ Dict containing the train metrics.
+ """
+
+ @abstractmethod
+ def test(
+ self,
+ ) -> Dict[str, float]:
+ """Test the model maintained in self.model.
+
+ Returns
+ -------
+ Dict containing the test metrics.
+ """
+
+ @abstractmethod
+ def train_dataset_size(self) -> int:
+ """Return train data set size."""
+
+ @abstractmethod
+ def test_dataset_size(self) -> int:
+ """Return test data set size."""
+
+ @abstractmethod
+ def total_dataset_size(self) -> int:
+ """Return total data set size."""
+
+ @property
+ def model(self) -> nn.Module:
+ """Return model."""
+ return self._model
diff --git a/baselines/fedper/fedper/run_figures.sh b/baselines/fedper/fedper/run_figures.sh
new file mode 100755
index 000000000000..9f7382412465
--- /dev/null
+++ b/baselines/fedper/fedper/run_figures.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+
+# CIFAR10 Mobile and Resnet (non-iid n classes (FIGURE 2a&b))
+for model in mobile resnet
+do
+ for num_classes in 4 8 10
+ do
+ for algorithm in fedper fedavg
+ do
+ python -m fedper.main --config-path conf --config-name cifar10 dataset.num_classes=${num_classes} model_name=${model} algorithm=${algorithm}
+ done
+ done
+done
+
+
+# CIFAR10 Mobile (n head layers (FIGURE 4a))
+for num_head_layers in 2 3 4
+do
+ python -m fedper.main --config-path conf --config-name cifar10 dataset.num_classes=4 model.num_head_layers=${num_head_layers} num_rounds=25 model_name=mobile algorithm=fedper
+done
+python -m fedper.main --config-path conf --config-name cifar10 num_rounds=25 model_name=mobile dataset.num_classes=4
+
+# CIFAR10 Resnet (n head layers (FIGURE 4b))
+for num_head_layers in 1 2 3
+do
+ python -m fedper.main --config-path conf --config-name cifar10 dataset.num_classes=4 model.num_head_layers=${num_head_layers} num_rounds=25 model_name=resnet algorithm=fedper
+done
+python -m fedper.main --config-path conf --config-name cifar10 num_rounds=25 model_name=resnet dataset.num_classes=4
+
+# FLICKR
+for model in mobile resnet
+do
+ python -m fedper.main --config-path conf --config-name flickr model.num_head_layers=2 model_name=${model} algorithm=fedper num_rounds=35
+ python -m fedper.main --config-path conf --config-name flickr model_name=${model} algorithm=fedavg num_rounds=35
+done
+
diff --git a/baselines/fedper/fedper/server.py b/baselines/fedper/fedper/server.py
new file mode 100644
index 000000000000..93616f50f45a
--- /dev/null
+++ b/baselines/fedper/fedper/server.py
@@ -0,0 +1,24 @@
+"""Server strategies pipelines for FedPer."""
+from flwr.server.strategy.fedavg import FedAvg
+
+from fedper.strategy import (
+ AggregateBodyStrategy,
+ AggregateFullStrategy,
+ ServerInitializationStrategy,
+)
+
+
+class InitializationStrategyPipeline(ServerInitializationStrategy):
+ """Initialization strategy pipeline."""
+
+
+class AggregateBodyStrategyPipeline(
+ InitializationStrategyPipeline, AggregateBodyStrategy, FedAvg
+):
+ """Aggregate body strategy pipeline."""
+
+
+class DefaultStrategyPipeline(
+ InitializationStrategyPipeline, AggregateFullStrategy, FedAvg
+):
+ """Default strategy pipeline."""
diff --git a/baselines/fedper/fedper/strategy.py b/baselines/fedper/fedper/strategy.py
new file mode 100644
index 000000000000..5ae55086db2f
--- /dev/null
+++ b/baselines/fedper/fedper/strategy.py
@@ -0,0 +1,437 @@
+"""FL server strategies."""
+from collections import OrderedDict
+from pathlib import Path
+from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
+
+import torch
+from flwr.common import (
+ EvaluateIns,
+ EvaluateRes,
+ FitIns,
+ FitRes,
+ NDArrays,
+ Parameters,
+ Scalar,
+ ndarrays_to_parameters,
+ parameters_to_ndarrays,
+)
+from flwr.server.client_manager import ClientManager
+from flwr.server.client_proxy import ClientProxy
+from flwr.server.strategy.fedavg import FedAvg
+from torch import nn as nn
+
+from fedper.constants import Algorithms
+from fedper.implemented_models.mobile_model import MobileNetModelSplit
+from fedper.implemented_models.resnet_model import ResNetModelSplit
+from fedper.models import ModelSplit
+
+
+class ServerInitializationStrategy(FedAvg):
+ """Server FL Parameter Initialization strategy implementation."""
+
+ def __init__(
+ self,
+ *args: Any,
+ model_split_class: Union[
+ Type[MobileNetModelSplit], Type[ModelSplit], Type[ResNetModelSplit]
+ ],
+ create_model: Callable[[], nn.Module],
+ initial_parameters: Optional[Parameters] = None,
+ on_fit_config_fn: Optional[Callable[[int], Dict[str, Any]]] = None,
+ evaluate_fn: Optional[
+ Callable[
+ [int, NDArrays, Dict[str, Scalar]],
+ Optional[Tuple[float, Dict[str, Scalar]]],
+ ]
+ ] = None,
+ min_available_clients: int = 1,
+ min_evaluate_clients: int = 1,
+ min_fit_clients: int = 1,
+ algorithm: str = Algorithms.FEDPER.value,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ _ = evaluate_fn
+ self.on_fit_config_fn = on_fit_config_fn
+ self.initial_parameters = initial_parameters
+ self.min_available_clients = min_available_clients
+ self.min_evaluate_clients = min_evaluate_clients
+ self.min_fit_clients = min_fit_clients
+ self.algorithm = algorithm
+ self.model = model_split_class(model=create_model())
+
+ def initialize_parameters(
+ self, client_manager: ClientManager
+ ) -> Optional[Parameters]:
+ """Initialize the (global) model parameters.
+
+ Args:
+ client_manager: ClientManager. The client manager which holds all currently
+ connected clients.
+
+ Returns
+ -------
+ If parameters are returned, then the server will treat these as the
+ initial global model parameters.
+ """
+ initial_parameters: Optional[Parameters] = self.initial_parameters
+ self.initial_parameters = None # Don't keep initial parameters in memory
+ if initial_parameters is None and self.model is not None:
+ if self.algorithm == Algorithms.FEDPER.value:
+ initial_parameters_use = [
+ val.cpu().numpy() for _, val in self.model.body.state_dict().items()
+ ]
+ else: # FedAvg
+ initial_parameters_use = [
+ val.cpu().numpy() for _, val in self.model.state_dict().items()
+ ]
+
+ if isinstance(initial_parameters_use, list):
+ initial_parameters = ndarrays_to_parameters(initial_parameters_use)
+ return initial_parameters
+
+
+class AggregateFullStrategy(ServerInitializationStrategy):
+ """Full model aggregation strategy implementation."""
+
+ def __init__(self, *args, save_path: Path = Path(""), **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.save_path = save_path if save_path != "" else None
+ if save_path is not None:
+ self.save_path = save_path / "models"
+ self.save_path.mkdir(parents=True, exist_ok=True)
+
+ def configure_evaluate(
+ self, server_round: int, parameters: Parameters, client_manager: ClientManager
+ ) -> List[Tuple[ClientProxy, EvaluateIns]]:
+ """Configure the next round of evaluation.
+
+ Args:
+ server_round: The current round of federated learning.
+ parameters: The current (global) model parameters.
+ client_manager: The client manager which holds all currently
+ connected clients.
+
+ Returns
+ -------
+ A list of tuples. Each tuple in the list identifies a `ClientProxy` and the
+ `EvaluateIns` for this particular `ClientProxy`. If a particular
+ `ClientProxy` is not included in this list, it means that this
+ `ClientProxy` will not participate in the next round of federated
+ evaluation.
+ """
+ # Same as superclass method but adds the head
+
+ # Parameters and config
+ config: Dict[Any, Any] = {}
+
+ weights = parameters_to_ndarrays(parameters)
+
+ parameters = ndarrays_to_parameters(weights)
+
+ evaluate_ins = EvaluateIns(parameters, config)
+
+ # Sample clients
+ if server_round >= 0:
+ # Sample clients
+ sample_size, min_num_clients = self.num_evaluation_clients(
+ client_manager.num_available()
+ )
+ clients = client_manager.sample(
+ num_clients=sample_size,
+ min_num_clients=min_num_clients,
+ )
+ else:
+ clients = list(client_manager.all().values())
+
+ # Return client/config pairs
+ return [(client, evaluate_ins) for client in clients]
+
+ def aggregate_fit(
+ self,
+ server_round: int,
+ results: List[Tuple[ClientProxy, FitRes]],
+ failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
+ ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
+ """Aggregate received local parameters, set global model parameters and save.
+
+ Args:
+ server_round: The current round of federated learning.
+ results: Successful updates from the previously selected and configured
+ clients. Each pair of `(ClientProxy, FitRes)` constitutes a
+ successful update from one of the previously selected clients. Not
+ that not all previously selected clients are necessarily included in
+ this list: a client might drop out and not submit a result. For each
+ client that did not submit an update, there should be an `Exception`
+ in `failures`.
+ failures: Exceptions that occurred while the server was waiting for client
+ updates.
+
+ Returns
+ -------
+ If parameters are returned, then the server will treat these as the
+ new global model parameters (i.e., it will replace the previous
+ parameters with the ones returned from this method). If `None` is
+ returned (e.g., because there were only failures and no viable
+ results) then the server will no update the previous model
+ parameters, the updates received in this round are discarded, and
+ the global model parameters remain the same.
+ """
+ agg_params, agg_metrics = super().aggregate_fit(
+ server_round=server_round, results=results, failures=failures
+ )
+ if agg_params is not None:
+ # Update Server Model
+ parameters = parameters_to_ndarrays(agg_params)
+ model_keys = [
+ k
+ for k in self.model.state_dict().keys()
+ if k.startswith("_body") or k.startswith("_head")
+ ]
+ params_dict = zip(model_keys, parameters)
+ state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
+ self.model.set_parameters(state_dict)
+
+ if self.save_path is not None:
+ # Save Model
+ torch.save(self.model, self.save_path / f"model-ep_{server_round}.pt")
+
+ return agg_params, agg_metrics
+
+ def aggregate_evaluate(
+ self,
+ server_round: int,
+ results: List[Tuple[ClientProxy, EvaluateRes]],
+ failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
+ ) -> Tuple[Optional[float], Dict[str, Scalar]]:
+ """Aggregate the received local parameters and store the test aggregated.
+
+ Args:
+ server_round: The current round of federated learning.
+ results: Successful updates from the
+ previously selected and configured clients. Each pair of
+ `(ClientProxy, FitRes` constitutes a successful update from one of the
+ previously selected clients. Not that not all previously selected
+ clients are necessarily included in this list: a client might drop out
+ and not submit a result. For each client that did not submit an update,
+ there should be an `Exception` in `failures`.
+ failures: Exceptions that occurred while the server
+ was waiting for client updates.
+
+ Returns
+ -------
+ Optional `float` representing the aggregated evaluation result. Aggregation
+ typically uses some variant of a weighted average.
+ """
+ aggregated_loss, aggregated_metrics = super().aggregate_evaluate(
+ server_round=server_round, results=results, failures=failures
+ )
+ _ = aggregated_metrics # Avoid unused variable warning
+
+ # Weigh accuracy of each client by number of examples used
+ accuracies: List[float] = []
+ for _, res in results:
+ accuracy: float = float(res.metrics["accuracy"])
+ accuracies.append(accuracy)
+ print(f"Round {server_round} accuracies: {accuracies}")
+
+ # Aggregate and print custom metric
+ averaged_accuracy = sum(accuracies) / len(accuracies)
+ print(f"Round {server_round} accuracy averaged: {averaged_accuracy}")
+ return aggregated_loss, {"accuracy": averaged_accuracy}
+
+
+class AggregateBodyStrategy(ServerInitializationStrategy):
+ """Body Aggregation strategy implementation."""
+
+ def __init__(self, *args, save_path: Path = Path(""), **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.save_path = save_path if save_path != "" else None
+ if save_path is not None:
+ self.save_path = save_path / "models"
+ self.save_path.mkdir(parents=True, exist_ok=True)
+
+ def configure_fit(
+ self, server_round: int, parameters: Parameters, client_manager: ClientManager
+ ) -> List[Tuple[ClientProxy, FitIns]]:
+ """Configure the next round of training.
+
+ Args:
+ server_round: The current round of federated learning.
+ parameters: The current (global) model parameters.
+ client_manager: The client manager which holds all
+ currently connected clients.
+
+ Returns
+ -------
+ A list of tuples. Each tuple in the list identifies a `ClientProxy` and the
+ `FitIns` for this particular `ClientProxy`. If a particular `ClientProxy`
+ is not included in this list, it means that this `ClientProxy`
+ will not participate in the next round of federated learning.
+ """
+ # Same as superclass method but adds the head
+
+ config = {}
+ if self.on_fit_config_fn is not None:
+ # Custom fit config function provided
+ config = self.on_fit_config_fn(server_round)
+
+ weights = parameters_to_ndarrays(parameters)
+
+ # Add head parameters to received body parameters
+ weights.extend(
+ [val.cpu().numpy() for _, val in self.model.head.state_dict().items()]
+ )
+
+ parameters = ndarrays_to_parameters(weights)
+
+ fit_ins = FitIns(parameters, config)
+
+ # Sample clients
+ clients = client_manager.sample(
+ num_clients=self.min_available_clients, min_num_clients=self.min_fit_clients
+ )
+
+ # Return client/config pairs
+ return [(client, fit_ins) for client in clients]
+
+ def configure_evaluate(
+ self, server_round: int, parameters: Parameters, client_manager: ClientManager
+ ) -> List[Tuple[ClientProxy, EvaluateIns]]:
+ """Configure the next round of evaluation.
+
+ Args:
+ server_round: The current round of federated learning.
+ parameters: The current (global) model parameters.
+ client_manager: The client manager which holds all currently
+ connected clients.
+
+ Returns
+ -------
+ A list of tuples. Each tuple in the list identifies a `ClientProxy` and the
+ `EvaluateIns` for this particular `ClientProxy`. If a particular
+ `ClientProxy` is not included in this list, it means that this
+ `ClientProxy` will not participate in the next round of federated
+ evaluation.
+ """
+ # Same as superclass method but adds the head
+
+ # Parameters and config
+ config: Dict[Any, Any] = {}
+
+ weights = parameters_to_ndarrays(parameters)
+
+ # Add head parameters to received body parameters
+ weights.extend(
+ [val.cpu().numpy() for _, val in self.model.head.state_dict().items()]
+ )
+
+ parameters = ndarrays_to_parameters(weights)
+
+ evaluate_ins = EvaluateIns(parameters, config)
+
+ # Sample clients
+ if server_round >= 0:
+ # Sample clients
+ sample_size, min_num_clients = self.num_evaluation_clients(
+ client_manager.num_available()
+ )
+ clients = client_manager.sample(
+ num_clients=sample_size,
+ min_num_clients=min_num_clients,
+ )
+ else:
+ clients = list(client_manager.all().values())
+
+ # Return client/config pairs
+ return [(client, evaluate_ins) for client in clients]
+
+ def aggregate_fit(
+ self,
+ server_round: int,
+ results: List[Tuple[ClientProxy, FitRes]],
+ failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
+ ) -> Tuple[Optional[Parameters], Dict[str, Union[bool, bytes, float, int, str]]]:
+ """Aggregate received local parameters, set global model parameters and save.
+
+ Args:
+ server_round: The current round of federated learning.
+ results: Successful updates from the previously selected and configured
+ clients. Each pair of `(ClientProxy, FitRes)` constitutes a
+ successful update from one of the previously selected clients. Not
+ that not all previously selected clients are necessarily included in
+ this list: a client might drop out and not submit a result. For each
+ client that did not submit an update, there should be an `Exception`
+ in `failures`.
+ failures: Exceptions that occurred while the server was waiting for client
+ updates.
+
+ Returns
+ -------
+ If parameters are returned, then the server will treat these as the
+ new global model parameters (i.e., it will replace the previous
+ parameters with the ones returned from this method). If `None` is
+ returned (e.g., because there were only failures and no viable
+ results) then the server will no update the previous model
+ parameters, the updates received in this round are discarded, and
+ the global model parameters remain the same.
+ """
+ agg_params, agg_metrics = super().aggregate_fit(
+ server_round=server_round, results=results, failures=failures
+ )
+ if agg_params is not None:
+ parameters = parameters_to_ndarrays(agg_params)
+ model_keys = [
+ k for k in self.model.state_dict().keys() if k.startswith("_body")
+ ]
+ params_dict = zip(model_keys, parameters)
+ state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
+ self.model.set_parameters(state_dict)
+
+ if self.save_path is not None:
+ # Save Model
+ torch.save(self.model, self.save_path / f"model-ep_{server_round}.pt")
+
+ return agg_params, agg_metrics
+
+ def aggregate_evaluate(
+ self,
+ server_round: int,
+ results: List[Tuple[ClientProxy, EvaluateRes]],
+ failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
+ ) -> Tuple[Optional[float], Dict[str, Scalar]]:
+ """Aggregate the received local parameters and store the test aggregated.
+
+ Args:
+ server_round: The current round of federated learning.
+ results: Successful updates from the
+ previously selected and configured clients. Each pair of
+ `(ClientProxy, FitRes` constitutes a successful update from one of the
+ previously selected clients. Not that not all previously selected
+ clients are necessarily included in this list: a client might drop out
+ and not submit a result. For each client that did not submit an update,
+ there should be an `Exception` in `failures`.
+ failures: Exceptions that occurred while the server
+ was waiting for client updates.
+
+ Returns
+ -------
+ Optional `float` representing the aggregated evaluation result. Aggregation
+ typically uses some variant of a weighted average.
+ """
+ aggregated_loss, aggregated_metrics = super().aggregate_evaluate(
+ server_round=server_round, results=results, failures=failures
+ )
+ _ = aggregated_metrics # Avoid unused variable warning
+
+ # Weigh accuracy of each client by number of examples used
+ accuracies: List[float] = []
+ for _, res in results:
+ accuracy: float = float(res.metrics["accuracy"])
+ accuracies.append(accuracy)
+ print(f"Round {server_round} accuracies: {accuracies}")
+
+ # Aggregate and print custom metric
+ averaged_accuracy = sum(accuracies) / len(accuracies)
+ print(f"Round {server_round} accuracy averaged: {averaged_accuracy}")
+ return aggregated_loss, {"accuracy": averaged_accuracy}
diff --git a/baselines/fedper/fedper/utils.py b/baselines/fedper/fedper/utils.py
new file mode 100644
index 000000000000..00b4c5318729
--- /dev/null
+++ b/baselines/fedper/fedper/utils.py
@@ -0,0 +1,225 @@
+"""Utility functions for FedPer."""
+import os
+import pickle
+import time
+from pathlib import Path
+from secrets import token_hex
+from typing import Callable, Optional, Type, Union
+
+import matplotlib.pyplot as plt
+import numpy as np
+from flwr.server.history import History
+from omegaconf import DictConfig
+
+from fedper.client import BaseClient, FedPerClient, get_client_fn_simulation
+from fedper.implemented_models.mobile_model import MobileNet, MobileNetModelSplit
+from fedper.implemented_models.resnet_model import ResNet, ResNetModelSplit
+
+
+def set_model_class(config: DictConfig) -> DictConfig:
+ """Set model class based on the model name in the config file."""
+ # Set the model class
+ if config.model_name.lower() == "resnet":
+ config.model["_target_"] = "fedper.implemented_models.resnet_model.ResNet"
+ elif config.model_name.lower() == "mobile":
+ config.model["_target_"] = "fedper.implemented_models.mobile_model.MobileNet"
+ else:
+ raise NotImplementedError(f"Model {config.model.name} not implemented")
+ return config
+
+
+def set_num_classes(config: DictConfig) -> DictConfig:
+ """Set the number of classes based on the dataset name in the config file."""
+ # Set the number of classes
+ if config.dataset.name.lower() == "cifar10":
+ config.model.num_classes = 10
+ elif config.dataset.name.lower() == "flickr":
+ config.model.num_classes = 5
+ # additionally for flickr
+ config.batch_size = 4
+ config.num_clients = 30
+ config.clients_per_round = 30
+ else:
+ raise NotImplementedError(f"Dataset {config.dataset.name} not implemented")
+ return config
+
+
+def set_server_target(config: DictConfig) -> DictConfig:
+ """Set the server target based on the algorithm in the config file."""
+ # Set the server target
+ if config.algorithm.lower() == "fedper":
+ config.strategy["_target_"] = "fedper.server.AggregateBodyStrategyPipeline"
+ elif config.algorithm.lower() == "fedavg":
+ config.strategy["_target_"] = "fedper.server.DefaultStrategyPipeline"
+ else:
+ raise NotImplementedError(f"Algorithm {config.algorithm} not implemented")
+ return config
+
+
+def set_client_state_save_path() -> str:
+ """Set the client state save path."""
+ client_state_save_path = time.strftime("%Y-%m-%d")
+ client_state_sub_path = time.strftime("%H-%M-%S")
+ client_state_save_path = (
+ f"./client_states/{client_state_save_path}/{client_state_sub_path}"
+ )
+ if not os.path.exists(client_state_save_path):
+ os.makedirs(client_state_save_path)
+ return client_state_save_path
+
+
+def get_client_fn(
+ config: DictConfig, client_state_save_path: str = ""
+) -> Callable[[str], Union[FedPerClient, BaseClient]]:
+ """Get client function."""
+ # Get algorithm
+ algorithm = config.algorithm.lower()
+ # Get client fn
+ if algorithm == "fedper":
+ client_fn = get_client_fn_simulation(
+ config=config,
+ client_state_save_path=client_state_save_path,
+ )
+ elif algorithm == "fedavg":
+ client_fn = get_client_fn_simulation(
+ config=config,
+ )
+ else:
+ raise NotImplementedError
+ return client_fn
+
+
+def get_create_model_fn(
+ config: DictConfig,
+) -> tuple[
+ Callable[[], Union[type[MobileNet], type[ResNet]]],
+ Union[type[MobileNetModelSplit], type[ResNetModelSplit]],
+]:
+ """Get create model function."""
+ device = config.server_device
+ split: Union[
+ Type[MobileNetModelSplit], Type[ResNetModelSplit]
+ ] = MobileNetModelSplit
+ if config.model_name.lower() == "mobile":
+
+ def create_model() -> Union[Type[MobileNet], Type[ResNet]]:
+ """Create initial MobileNet-v1 model."""
+ return MobileNet(
+ num_head_layers=config.model.num_head_layers,
+ num_classes=config.model.num_classes,
+ ).to(device)
+
+ elif config.model_name.lower() == "resnet":
+ split = ResNetModelSplit
+
+ def create_model() -> Union[Type[MobileNet], Type[ResNet]]:
+ """Create initial ResNet model."""
+ return ResNet(
+ num_head_layers=config.model.num_head_layers,
+ num_classes=config.model.num_classes,
+ ).to(device)
+
+ else:
+ raise NotImplementedError("Model not implemented, check name. ")
+ return create_model, split
+
+
+def plot_metric_from_history(
+ hist: History,
+ save_plot_path: Path,
+ suffix: Optional[str] = "",
+) -> None:
+ """Plot from Flower server History.
+
+ Parameters
+ ----------
+ hist : History
+ Object containing evaluation for all rounds.
+ save_plot_path : Path
+ Folder to save the plot to.
+ suffix: Optional[str]
+ Optional string to add at the end of the filename for the plot.
+ """
+ metric_type = "distributed"
+ metric_dict = (
+ hist.metrics_centralized
+ if metric_type == "centralized"
+ else hist.metrics_distributed
+ )
+ _, values = zip(*metric_dict["accuracy"])
+
+ # let's extract decentralized loss (main metric reported in FedProx paper)
+ rounds_loss, values_loss = zip(*hist.losses_distributed)
+
+ _, axs = plt.subplots(nrows=2, ncols=1, sharex="row")
+ axs[0].plot(np.asarray(rounds_loss), np.asarray(values_loss))
+ axs[1].plot(np.asarray(rounds_loss), np.asarray(values))
+
+ axs[0].set_ylabel("Loss")
+ axs[1].set_ylabel("Accuracy")
+
+ axs[0].grid()
+ axs[1].grid()
+ # plt.title(f"{metric_type.capitalize()} Validation - MNIST")
+ plt.xlabel("Rounds")
+ # plt.legend(loc="lower right")
+
+ plt.savefig(Path(save_plot_path) / Path(f"{metric_type}_metrics{suffix}.png"))
+ plt.close()
+
+
+def save_results_as_pickle(
+ history: History,
+ file_path: Union[str, Path],
+ default_filename: Optional[str] = "results.pkl",
+) -> None:
+ """Save results from simulation to pickle.
+
+ Parameters
+ ----------
+ history: History
+ History returned by start_simulation.
+ file_path: Union[str, Path]
+ Path to file to create and store both history and extra_results.
+ If path is a directory, the default_filename will be used.
+ path doesn't exist, it will be created. If file exists, a
+ randomly generated suffix will be added to the file name. This
+ is done to avoid overwritting results.
+ extra_results : Optional[Dict]
+ A dictionary containing additional results you would like
+ to be saved to disk. Default: {} (an empty dictionary)
+ default_filename: Optional[str]
+ File used by default if file_path points to a directory instead
+ to a file. Default: "results.pkl"
+ """
+ path = Path(file_path)
+
+ # ensure path exists
+ path.mkdir(exist_ok=True, parents=True)
+
+ def _add_random_suffix(path_: Path):
+ """Add a random suffix to the file name."""
+ print(f"File `{path_}` exists! ")
+ suffix = token_hex(4)
+ print(f"New results to be saved with suffix: {suffix}")
+ return path_.parent / (path_.stem + "_" + suffix + ".pkl")
+
+ def _complete_path_with_default_name(path_: Path):
+ """Append the default file name to the path."""
+ print("Using default filename")
+ if default_filename is None:
+ return path_
+ return path_ / default_filename
+
+ if path.is_dir():
+ path = _complete_path_with_default_name(path)
+
+ if path.is_file():
+ path = _add_random_suffix(path)
+
+ print(f"Results will be saved into: {path}")
+ # data = {"history": history, **extra_results}
+ data = {"history": history}
+ # save results to pickle
+ with open(str(path), "wb") as handle:
+ pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)
diff --git a/baselines/fedper/pyproject.toml b/baselines/fedper/pyproject.toml
new file mode 100644
index 000000000000..efcdf25eface
--- /dev/null
+++ b/baselines/fedper/pyproject.toml
@@ -0,0 +1,143 @@
+[build-system]
+requires = ["poetry-core>=1.4.0"]
+build-backend = "poetry.masonry.api"
+
+[tool.poetry]
+name = "fedper" # <----- Ensure it matches the name of your baseline directory containing all the source code
+version = "1.0.0"
+description = "Federated Learning with Personalization Layers"
+license = "Apache-2.0"
+authors = ["The Flower Authors ", "William Lindskog "]
+readme = "README.md"
+homepage = "https://flower.dev"
+repository = "https://github.com/adap/flower"
+documentation = "https://flower.dev"
+classifiers = [
+ "Development Status :: 3 - Alpha",
+ "Intended Audience :: Developers",
+ "Intended Audience :: Science/Research",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: MacOS :: MacOS X",
+ "Operating System :: POSIX :: Linux",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3 :: Only",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: Implementation :: CPython",
+ "Topic :: Scientific/Engineering",
+ "Topic :: Scientific/Engineering :: Artificial Intelligence",
+ "Topic :: Scientific/Engineering :: Mathematics",
+ "Topic :: Software Development",
+ "Topic :: Software Development :: Libraries",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ "Typing :: Typed",
+]
+
+[tool.poetry.dependencies]
+python = ">=3.10.0, <3.11.0" # don't change this
+flwr = {extras = ["simulation"], version = "1.5.0" }
+hydra-core = "1.3.2" # don't change this
+pandas = "^2.0.3"
+matplotlib = "^3.7.2"
+tqdm = "^4.66.1"
+torch = { url = "https://download.pytorch.org/whl/cu117/torch-2.0.1%2Bcu117-cp310-cp310-linux_x86_64.whl"}
+torchvision = { url = "https://download.pytorch.org/whl/cu117/torchvision-0.15.2%2Bcu117-cp310-cp310-linux_x86_64.whl"}
+
+
+[tool.poetry.dev-dependencies]
+isort = "==5.11.5"
+black = "==23.1.0"
+docformatter = "==1.5.1"
+mypy = "==1.4.1"
+pylint = "==2.8.2"
+flake8 = "==3.9.2"
+pytest = "==6.2.4"
+pytest-watch = "==4.2.0"
+ruff = "==0.0.272"
+types-requests = "==2.27.7"
+
+[tool.isort]
+line_length = 88
+indent = " "
+multi_line_output = 3
+include_trailing_comma = true
+force_grid_wrap = 0
+use_parentheses = true
+
+[tool.black]
+line-length = 88
+target-version = ["py38", "py39", "py310", "py311"]
+
+[tool.pytest.ini_options]
+minversion = "6.2"
+addopts = "-qq"
+testpaths = [
+ "flwr_baselines",
+]
+
+[tool.mypy]
+ignore_missing_imports = true
+strict = false
+plugins = "numpy.typing.mypy_plugin"
+
+[tool.pylint."MESSAGES CONTROL"]
+disable = "bad-continuation,duplicate-code,too-few-public-methods,useless-import-alias"
+good-names = "i,j,k,_,x,y,X,Y"
+signature-mutators="hydra.main.main"
+
+[tool.pylint."TYPECHECK"]
+generated-members="numpy.*, torch.*, tensorflow.*"
+
+[[tool.mypy.overrides]]
+module = [
+ "importlib.metadata.*",
+ "importlib_metadata.*",
+]
+follow_imports = "skip"
+follow_imports_for_stubs = true
+disallow_untyped_calls = false
+
+[[tool.mypy.overrides]]
+module = "torch.*"
+follow_imports = "skip"
+follow_imports_for_stubs = true
+
+[tool.docformatter]
+wrap-summaries = 88
+wrap-descriptions = 88
+
+[tool.ruff]
+target-version = "py38"
+line-length = 88
+select = ["D", "E", "F", "W", "B", "ISC", "C4"]
+fixable = ["D", "E", "F", "W", "B", "ISC", "C4"]
+ignore = ["B024", "B027"]
+exclude = [
+ ".bzr",
+ ".direnv",
+ ".eggs",
+ ".git",
+ ".hg",
+ ".mypy_cache",
+ ".nox",
+ ".pants.d",
+ ".pytype",
+ ".ruff_cache",
+ ".svn",
+ ".tox",
+ ".venv",
+ "__pypackages__",
+ "_build",
+ "buck-out",
+ "build",
+ "dist",
+ "node_modules",
+ "venv",
+ "proto",
+]
+
+[tool.ruff.pydocstyle]
+convention = "numpy"
\ No newline at end of file
diff --git a/doc/source/ref-changelog.md b/doc/source/ref-changelog.md
index 313ba97b75d8..e987596d7d0e 100644
--- a/doc/source/ref-changelog.md
+++ b/doc/source/ref-changelog.md
@@ -38,6 +38,8 @@
- DepthFL [#2295](https://github.com/adap/flower/pull/2295)
+ - FedPer [#2266](https://github.com/adap/flower/pull/2266)
+
- **Update Flower Examples** ([#2384](https://github.com/adap/flower/pull/2384),[#2425](https://github.com/adap/flower/pull/2425), [#2526](https://github.com/adap/flower/pull/2526))
- **General updates to baselines** ([#2301](https://github.com/adap/flower/pull/2301), [#2305](https://github.com/adap/flower/pull/2305), [#2307](https://github.com/adap/flower/pull/2307), [#2327](https://github.com/adap/flower/pull/2327), [#2435](https://github.com/adap/flower/pull/2435))